cohit's picture
Upload folder using huggingface_hub
0827183 verified
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from unittest.mock import create_autospec, MagicMock
from typing import Dict
import aiounittest
from botbuilder.core.adapters import TestAdapter, TestFlow
from botbuilder.schema import Activity
from botbuilder.core import (
ConversationState,
MemoryStorage,
TurnContext,
NullTelemetryClient,
)
from botbuilder.dialogs import (
Dialog,
DialogInstance,
DialogReason,
DialogSet,
WaterfallDialog,
DialogTurnResult,
DialogTurnStatus,
)
BEGIN_MESSAGE = Activity()
BEGIN_MESSAGE.text = "begin"
BEGIN_MESSAGE.type = "message"
MOCK_TELEMETRY = "botbuilder.applicationinsights.ApplicationInsightsTelemetryClient"
class TelemetryWaterfallTests(aiounittest.AsyncTestCase):
def test_none_telemetry_client(self):
# arrange
dialog = WaterfallDialog("myId")
# act
dialog.telemetry_client = None
# assert
self.assertEqual(type(dialog.telemetry_client), NullTelemetryClient)
async def test_execute_sequence_waterfall_steps(self):
# arrange
# Create new ConversationState with MemoryStorage and register the state as middleware.
convo_state = ConversationState(MemoryStorage())
telemetry = MagicMock(name=MOCK_TELEMETRY)
# Create a DialogState property, DialogSet and register the WaterfallDialog.
dialog_state = convo_state.create_property("dialogState")
dialogs = DialogSet(dialog_state)
async def step1(step) -> DialogTurnResult:
await step.context.send_activity("bot responding.")
return Dialog.end_of_turn
async def step2(step) -> DialogTurnResult:
await step.context.send_activity("ending WaterfallDialog.")
return Dialog.end_of_turn
# act
my_dialog = WaterfallDialog("test", [step1, step2])
my_dialog.telemetry_client = telemetry
dialogs.add(my_dialog)
# Initialize TestAdapter
async def exec_test(turn_context: TurnContext) -> None:
dialog_context = await dialogs.create_context(turn_context)
results = await dialog_context.continue_dialog()
if results.status == DialogTurnStatus.Empty:
await dialog_context.begin_dialog("test")
else:
if results.status == DialogTurnStatus.Complete:
await turn_context.send_activity(results.result)
await convo_state.save_changes(turn_context)
adapt = TestAdapter(exec_test)
test_flow = TestFlow(None, adapt)
tf2 = await test_flow.send(BEGIN_MESSAGE)
tf3 = await tf2.assert_reply("bot responding.")
tf4 = await tf3.send("continue")
await tf4.assert_reply("ending WaterfallDialog.")
# assert
telemetry_calls = [
("WaterfallStart", {"DialogId": "test"}),
("WaterfallStep", {"DialogId": "test", "StepName": step1.__qualname__}),
("WaterfallStep", {"DialogId": "test", "StepName": step2.__qualname__}),
]
self.assert_telemetry_calls(telemetry, telemetry_calls)
async def test_ensure_end_dialog_called(self):
# arrange
# Create new ConversationState with MemoryStorage and register the state as middleware.
convo_state = ConversationState(MemoryStorage())
telemetry = MagicMock(name=MOCK_TELEMETRY)
# Create a DialogState property, DialogSet and register the WaterfallDialog.
dialog_state = convo_state.create_property("dialogState")
dialogs = DialogSet(dialog_state)
async def step1(step) -> DialogTurnResult:
await step.context.send_activity("step1 response")
return Dialog.end_of_turn
async def step2(step) -> DialogTurnResult:
await step.context.send_activity("step2 response")
return Dialog.end_of_turn
# act
my_dialog = WaterfallDialog("test", [step1, step2])
my_dialog.telemetry_client = telemetry
dialogs.add(my_dialog)
# Initialize TestAdapter
async def exec_test(turn_context: TurnContext) -> None:
dialog_context = await dialogs.create_context(turn_context)
await dialog_context.continue_dialog()
if not turn_context.responded:
await dialog_context.begin_dialog("test", None)
await convo_state.save_changes(turn_context)
adapt = TestAdapter(exec_test)
test_flow = TestFlow(None, adapt)
tf2 = await test_flow.send(BEGIN_MESSAGE)
tf3 = await tf2.assert_reply("step1 response")
tf4 = await tf3.send("continue")
tf5 = await tf4.assert_reply("step2 response")
await tf5.send(
"Should hit end of steps - this will restart the dialog and trigger COMPLETE event"
)
# assert
telemetry_calls = [
("WaterfallStart", {"DialogId": "test"}),
("WaterfallStep", {"DialogId": "test", "StepName": step1.__qualname__}),
("WaterfallStep", {"DialogId": "test", "StepName": step2.__qualname__}),
("WaterfallComplete", {"DialogId": "test"}),
("WaterfallStart", {"DialogId": "test"}),
("WaterfallStep", {"DialogId": "test", "StepName": step1.__qualname__}),
]
self.assert_telemetry_calls(telemetry, telemetry_calls)
async def test_cancelling_waterfall_telemetry(self):
# Arrange
dialog_id = "waterfall"
index = 0
guid = "(guid)"
async def my_waterfall_step(step) -> DialogTurnResult:
await step.context.send_activity("step1 response")
return Dialog.end_of_turn
dialog = WaterfallDialog(dialog_id, [my_waterfall_step])
telemetry_client = create_autospec(NullTelemetryClient)
dialog.telemetry_client = telemetry_client
dialog_instance = DialogInstance()
dialog_instance.id = dialog_id
dialog_instance.state = {"instanceId": guid, "stepIndex": index}
# Act
await dialog.end_dialog(
TurnContext(TestAdapter(), Activity()),
dialog_instance,
DialogReason.CancelCalled,
)
# Assert
telemetry_props = telemetry_client.track_event.call_args_list[0][0][1]
self.assertEqual(3, len(telemetry_props))
self.assertEqual(dialog_id, telemetry_props["DialogId"])
self.assertEqual(my_waterfall_step.__qualname__, telemetry_props["StepName"])
self.assertEqual(guid, telemetry_props["InstanceId"])
telemetry_client.track_event.assert_called_once()
def assert_telemetry_call(
self, telemetry_mock, index: int, event_name: str, props: Dict[str, str]
) -> None:
# pylint: disable=unused-variable
args, kwargs = telemetry_mock.track_event.call_args_list[index]
self.assertEqual(args[0], event_name)
for key, val in props.items():
self.assertTrue(
key in args[1],
msg=f"Could not find value {key} in {args[1]} for index {index}",
)
self.assertTrue(isinstance(args[1], dict))
self.assertTrue(val == args[1][key])
def assert_telemetry_calls(self, telemetry_mock, calls) -> None:
index = 0
for event_name, props in calls:
self.assert_telemetry_call(telemetry_mock, index, event_name, props)
index += 1
if index != len(telemetry_mock.track_event.call_args_list):
self.assertTrue( # pylint: disable=redundant-unittest-assert
False,
f"Found {len(telemetry_mock.track_event.call_args_list)} calls, testing for {index + 1}",
)