Spaces:
Build error
Build error
Validify-testbot-1
/
botbuilder-python
/libraries
/botbuilder-applicationinsights
/tests
/test_telemetry_waterfall.py
# Copyright (c) Microsoft Corporation. All rights reserved. | |
# Licensed under the MIT License. | |
from unittest.mock import create_autospec, MagicMock | |
from typing import Dict | |
import aiounittest | |
from botbuilder.core.adapters import TestAdapter, TestFlow | |
from botbuilder.schema import Activity | |
from botbuilder.core import ( | |
ConversationState, | |
MemoryStorage, | |
TurnContext, | |
NullTelemetryClient, | |
) | |
from botbuilder.dialogs import ( | |
Dialog, | |
DialogInstance, | |
DialogReason, | |
DialogSet, | |
WaterfallDialog, | |
DialogTurnResult, | |
DialogTurnStatus, | |
) | |
BEGIN_MESSAGE = Activity() | |
BEGIN_MESSAGE.text = "begin" | |
BEGIN_MESSAGE.type = "message" | |
MOCK_TELEMETRY = "botbuilder.applicationinsights.ApplicationInsightsTelemetryClient" | |
class TelemetryWaterfallTests(aiounittest.AsyncTestCase): | |
def test_none_telemetry_client(self): | |
# arrange | |
dialog = WaterfallDialog("myId") | |
# act | |
dialog.telemetry_client = None | |
# assert | |
self.assertEqual(type(dialog.telemetry_client), NullTelemetryClient) | |
async def test_execute_sequence_waterfall_steps(self): | |
# arrange | |
# Create new ConversationState with MemoryStorage and register the state as middleware. | |
convo_state = ConversationState(MemoryStorage()) | |
telemetry = MagicMock(name=MOCK_TELEMETRY) | |
# Create a DialogState property, DialogSet and register the WaterfallDialog. | |
dialog_state = convo_state.create_property("dialogState") | |
dialogs = DialogSet(dialog_state) | |
async def step1(step) -> DialogTurnResult: | |
await step.context.send_activity("bot responding.") | |
return Dialog.end_of_turn | |
async def step2(step) -> DialogTurnResult: | |
await step.context.send_activity("ending WaterfallDialog.") | |
return Dialog.end_of_turn | |
# act | |
my_dialog = WaterfallDialog("test", [step1, step2]) | |
my_dialog.telemetry_client = telemetry | |
dialogs.add(my_dialog) | |
# Initialize TestAdapter | |
async def exec_test(turn_context: TurnContext) -> None: | |
dialog_context = await dialogs.create_context(turn_context) | |
results = await dialog_context.continue_dialog() | |
if results.status == DialogTurnStatus.Empty: | |
await dialog_context.begin_dialog("test") | |
else: | |
if results.status == DialogTurnStatus.Complete: | |
await turn_context.send_activity(results.result) | |
await convo_state.save_changes(turn_context) | |
adapt = TestAdapter(exec_test) | |
test_flow = TestFlow(None, adapt) | |
tf2 = await test_flow.send(BEGIN_MESSAGE) | |
tf3 = await tf2.assert_reply("bot responding.") | |
tf4 = await tf3.send("continue") | |
await tf4.assert_reply("ending WaterfallDialog.") | |
# assert | |
telemetry_calls = [ | |
("WaterfallStart", {"DialogId": "test"}), | |
("WaterfallStep", {"DialogId": "test", "StepName": step1.__qualname__}), | |
("WaterfallStep", {"DialogId": "test", "StepName": step2.__qualname__}), | |
] | |
self.assert_telemetry_calls(telemetry, telemetry_calls) | |
async def test_ensure_end_dialog_called(self): | |
# arrange | |
# Create new ConversationState with MemoryStorage and register the state as middleware. | |
convo_state = ConversationState(MemoryStorage()) | |
telemetry = MagicMock(name=MOCK_TELEMETRY) | |
# Create a DialogState property, DialogSet and register the WaterfallDialog. | |
dialog_state = convo_state.create_property("dialogState") | |
dialogs = DialogSet(dialog_state) | |
async def step1(step) -> DialogTurnResult: | |
await step.context.send_activity("step1 response") | |
return Dialog.end_of_turn | |
async def step2(step) -> DialogTurnResult: | |
await step.context.send_activity("step2 response") | |
return Dialog.end_of_turn | |
# act | |
my_dialog = WaterfallDialog("test", [step1, step2]) | |
my_dialog.telemetry_client = telemetry | |
dialogs.add(my_dialog) | |
# Initialize TestAdapter | |
async def exec_test(turn_context: TurnContext) -> None: | |
dialog_context = await dialogs.create_context(turn_context) | |
await dialog_context.continue_dialog() | |
if not turn_context.responded: | |
await dialog_context.begin_dialog("test", None) | |
await convo_state.save_changes(turn_context) | |
adapt = TestAdapter(exec_test) | |
test_flow = TestFlow(None, adapt) | |
tf2 = await test_flow.send(BEGIN_MESSAGE) | |
tf3 = await tf2.assert_reply("step1 response") | |
tf4 = await tf3.send("continue") | |
tf5 = await tf4.assert_reply("step2 response") | |
await tf5.send( | |
"Should hit end of steps - this will restart the dialog and trigger COMPLETE event" | |
) | |
# assert | |
telemetry_calls = [ | |
("WaterfallStart", {"DialogId": "test"}), | |
("WaterfallStep", {"DialogId": "test", "StepName": step1.__qualname__}), | |
("WaterfallStep", {"DialogId": "test", "StepName": step2.__qualname__}), | |
("WaterfallComplete", {"DialogId": "test"}), | |
("WaterfallStart", {"DialogId": "test"}), | |
("WaterfallStep", {"DialogId": "test", "StepName": step1.__qualname__}), | |
] | |
self.assert_telemetry_calls(telemetry, telemetry_calls) | |
async def test_cancelling_waterfall_telemetry(self): | |
# Arrange | |
dialog_id = "waterfall" | |
index = 0 | |
guid = "(guid)" | |
async def my_waterfall_step(step) -> DialogTurnResult: | |
await step.context.send_activity("step1 response") | |
return Dialog.end_of_turn | |
dialog = WaterfallDialog(dialog_id, [my_waterfall_step]) | |
telemetry_client = create_autospec(NullTelemetryClient) | |
dialog.telemetry_client = telemetry_client | |
dialog_instance = DialogInstance() | |
dialog_instance.id = dialog_id | |
dialog_instance.state = {"instanceId": guid, "stepIndex": index} | |
# Act | |
await dialog.end_dialog( | |
TurnContext(TestAdapter(), Activity()), | |
dialog_instance, | |
DialogReason.CancelCalled, | |
) | |
# Assert | |
telemetry_props = telemetry_client.track_event.call_args_list[0][0][1] | |
self.assertEqual(3, len(telemetry_props)) | |
self.assertEqual(dialog_id, telemetry_props["DialogId"]) | |
self.assertEqual(my_waterfall_step.__qualname__, telemetry_props["StepName"]) | |
self.assertEqual(guid, telemetry_props["InstanceId"]) | |
telemetry_client.track_event.assert_called_once() | |
def assert_telemetry_call( | |
self, telemetry_mock, index: int, event_name: str, props: Dict[str, str] | |
) -> None: | |
# pylint: disable=unused-variable | |
args, kwargs = telemetry_mock.track_event.call_args_list[index] | |
self.assertEqual(args[0], event_name) | |
for key, val in props.items(): | |
self.assertTrue( | |
key in args[1], | |
msg=f"Could not find value {key} in {args[1]} for index {index}", | |
) | |
self.assertTrue(isinstance(args[1], dict)) | |
self.assertTrue(val == args[1][key]) | |
def assert_telemetry_calls(self, telemetry_mock, calls) -> None: | |
index = 0 | |
for event_name, props in calls: | |
self.assert_telemetry_call(telemetry_mock, index, event_name, props) | |
index += 1 | |
if index != len(telemetry_mock.track_event.call_args_list): | |
self.assertTrue( # pylint: disable=redundant-unittest-assert | |
False, | |
f"Found {len(telemetry_mock.track_event.call_args_list)} calls, testing for {index + 1}", | |
) | |