# Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. import asyncio import pytest from botbuilder.schema import ( Activity, ActivityTypes, Attachment, AttachmentLayoutTypes, CardImage, ChannelAccount, ConversationParameters, ErrorResponseException, HeroCard, ) from botframework.connector.aio import ConnectorClient from botframework.connector.auth import MicrosoftAppCredentials from authentication_stub import MicrosoftTokenAuthenticationStub SERVICE_URL = "https://slack.botframework.com" CHANNEL_ID = "slack" BOT_NAME = "botbuilder-pc-bot" BOT_ID = "B21UTEF8S:T03CWQ0QB" RECIPIENT_ID = "U19KH8EHJ:T03CWQ0QB" CONVERSATION_ID = "B21UTEF8S:T03CWQ0QB:D2369CT7C" async def get_auth_token(): try: # pylint: disable=import-outside-toplevel from .app_creds_real import MICROSOFT_APP_PASSWORD, MICROSOFT_APP_ID # # Define a "app_creds_real.py" file with your bot credentials as follows: # # MICROSOFT_APP_ID = '...' # # MICROSOFT_APP_PASSWORD = '...' return await MicrosoftAppCredentials( MICROSOFT_APP_ID, MICROSOFT_APP_PASSWORD ).get_access_token() except ImportError: return "STUB_ACCESS_TOKEN" LOOP = asyncio.get_event_loop() AUTH_TOKEN = LOOP.run_until_complete(get_auth_token()) class TestAsyncConversation: def __init__(self): super(TestAsyncConversation, self).__init__() self.loop = asyncio.get_event_loop() self.credentials = MicrosoftTokenAuthenticationStub(AUTH_TOKEN) def test_conversations_create_conversation(self): test_object = ChannelAccount(id=RECIPIENT_ID) create_conversation = ConversationParameters( bot=ChannelAccount(id=BOT_ID), members=[test_object], activity=Activity( type=ActivityTypes.message, channel_id=CHANNEL_ID, from_property=ChannelAccount(id=BOT_ID), recipient=test_object, text="Hi there!", ), ) # creds = MicrosoftTokenAuthenticationStub(get_auth_token()) print("Printing the pointer to the generated MicrosoftAppCredentials:") # print(creds) connector = ConnectorClient(self.credentials, base_url=SERVICE_URL) try: conversation = self.loop.run_until_complete( connector.conversations.create_conversation(create_conversation) ) except Exception as error: raise error else: assert conversation.id is not None def test_conversations_create_conversation_with_invalid_bot_id_fails(self): test_object = ChannelAccount(id=RECIPIENT_ID) create_conversation = ConversationParameters( bot=ChannelAccount(id="INVALID"), members=[test_object], activity=Activity( type=ActivityTypes.message, channel_id=CHANNEL_ID, from_property=ChannelAccount(id="INVALID"), recipient=test_object, text="Hi there!", ), ) with pytest.raises(ErrorResponseException) as excinfo: connector = ConnectorClient(self.credentials, base_url=SERVICE_URL) self.loop.run_until_complete( connector.conversations.create_conversation(create_conversation) ) assert excinfo.value.error.error.code == "ServiceError" assert "Invalid userId" in str(excinfo.value.error.error.message) def test_conversations_create_conversation_without_members_fails(self): create_conversation = ConversationParameters( bot=ChannelAccount(id=BOT_ID), activity=Activity( type=ActivityTypes.message, channel_id=CHANNEL_ID, from_property=ChannelAccount(id=BOT_ID), text="Hi there!", ), members=[], ) with pytest.raises(ErrorResponseException) as excinfo: connector = ConnectorClient(self.credentials, base_url=SERVICE_URL) self.loop.run_until_complete( connector.conversations.create_conversation(create_conversation) ) assert excinfo.value.error.error.code == "BadArgument" assert "Conversations" in str(excinfo.value.error.error.message) def test_conversations_create_conversation_with_bot_as_only_member_fails(self): test_object = ChannelAccount(id=BOT_ID) sender = ChannelAccount(id=BOT_ID) create_conversation = ConversationParameters( bot=sender, members=[test_object], activity=Activity( type=ActivityTypes.message, channel_id=CHANNEL_ID, from_property=sender, recipient=test_object, text="Hi there!", ), ) with pytest.raises(ErrorResponseException) as excinfo: connector = ConnectorClient(self.credentials, base_url=SERVICE_URL) self.loop.run_until_complete( connector.conversations.create_conversation(create_conversation) ) assert excinfo.value.error.error.code == "BadArgument" assert "Bots cannot IM other bots" in str(excinfo.value.error.error.message) def test_conversations_send_to_conversation(self): activity = Activity( type=ActivityTypes.message, channel_id=CHANNEL_ID, recipient=ChannelAccount(id=RECIPIENT_ID), from_property=ChannelAccount(id=BOT_ID), text="Hello again!", ) connector = ConnectorClient(self.credentials, base_url=SERVICE_URL) try: response = self.loop.run_until_complete( connector.conversations.send_to_conversation(CONVERSATION_ID, activity) ) except Exception as error: raise error else: assert response is not None def test_conversations_send_to_conversation_with_attachment(self): card1 = HeroCard( title="A static image", text="JPEG image", images=[ CardImage( url="https://docs.com/en-us/bot-framework/media/designing-bots/core/dialogs-screens.png" ) ], ) card2 = HeroCard( title="An animation", subtitle="GIF image", images=[CardImage(url="http://i.giphy.com/Ki55RUbOV5njy.gif")], ) activity = Activity( type=ActivityTypes.message, channel_id=CHANNEL_ID, recipient=ChannelAccount(id=RECIPIENT_ID), from_property=ChannelAccount(id=BOT_ID), attachment_layout=AttachmentLayoutTypes.list, attachments=[ Attachment(content_type="application/vnd.card.hero", content=card1), Attachment(content_type="application/vnd.card.hero", content=card2), ], ) connector = ConnectorClient(self.credentials, base_url=SERVICE_URL) response = self.loop.run_until_complete( connector.conversations.send_to_conversation(CONVERSATION_ID, activity) ) assert response is not None def test_conversations_send_to_conversation_with_invalid_conversation_id_fails( self, ): activity = Activity( type=ActivityTypes.message, channel_id=CHANNEL_ID, recipient=ChannelAccount(id=RECIPIENT_ID), from_property=ChannelAccount(id=BOT_ID), text="Error!", ) with pytest.raises(ErrorResponseException) as excinfo: connector = ConnectorClient(self.credentials, base_url=SERVICE_URL) self.loop.run_until_complete( connector.conversations.send_to_conversation("123", activity) ) assert excinfo.value.error.error.code == "ServiceError" assert "cannot send messages to this id" in str( excinfo.value.error.error.message ) or "Invalid ConversationId" in str(excinfo.value.error.error.message) def test_conversations_get_conversation_members(self): connector = ConnectorClient(self.credentials, base_url=SERVICE_URL) members = self.loop.run_until_complete( connector.conversations.get_conversation_members(CONVERSATION_ID) ) assert len(members) == 2 assert members[0].name == BOT_NAME assert members[0].id == BOT_ID def test_conversations_get_conversation_members_invalid_id_fails(self): with pytest.raises(ErrorResponseException) as excinfo: connector = ConnectorClient(self.credentials, base_url=SERVICE_URL) self.loop.run_until_complete( connector.conversations.get_conversation_members("INVALID_ID") ) assert excinfo.value.error.error.code == "ServiceError" assert "cannot send messages to this id" in str( excinfo.value.error.error.message ) or "Invalid ConversationId" in str(excinfo.value.error.error.message) def test_conversations_update_activity(self): activity = Activity( type=ActivityTypes.message, channel_id=CHANNEL_ID, recipient=ChannelAccount(id=RECIPIENT_ID), from_property=ChannelAccount(id=BOT_ID), text="Updating activity...", ) activity_update = Activity( type=ActivityTypes.message, channel_id=CHANNEL_ID, recipient=ChannelAccount(id=RECIPIENT_ID), from_property=ChannelAccount(id=BOT_ID), text="Activity updated.", ) connector = ConnectorClient(self.credentials, base_url=SERVICE_URL) response = self.loop.run_until_complete( connector.conversations.send_to_conversation(CONVERSATION_ID, activity) ) activity_id = response.id response = self.loop.run_until_complete( connector.conversations.update_activity( CONVERSATION_ID, activity_id, activity_update ) ) assert response is not None assert response.id == activity_id def test_conversations_update_activity_invalid_conversation_id_fails(self): activity = Activity( type=ActivityTypes.message, channel_id=CHANNEL_ID, recipient=ChannelAccount(id=RECIPIENT_ID), from_property=ChannelAccount(id=BOT_ID), text="Updating activity...", ) activity_update = Activity( type=ActivityTypes.message, channel_id=CHANNEL_ID, recipient=ChannelAccount(id=RECIPIENT_ID), from_property=ChannelAccount(id=BOT_ID), text="Activity updated.", ) with pytest.raises(ErrorResponseException) as excinfo: connector = ConnectorClient(self.credentials, base_url=SERVICE_URL) response = self.loop.run_until_complete( connector.conversations.send_to_conversation(CONVERSATION_ID, activity) ) activity_id = response.id self.loop.run_until_complete( connector.conversations.update_activity( "INVALID_ID", activity_id, activity_update ) ) assert excinfo.value.error.error.code == "ServiceError" assert "Invalid ConversationId" in str(excinfo.value.error.error.message) def test_conversations_reply_to_activity(self): activity = Activity( type=ActivityTypes.message, channel_id=CHANNEL_ID, recipient=ChannelAccount(id=RECIPIENT_ID), from_property=ChannelAccount(id=BOT_ID), text="Thread activity", ) child_activity = Activity( type=ActivityTypes.message, channel_id=CHANNEL_ID, recipient=ChannelAccount(id=RECIPIENT_ID), from_property=ChannelAccount(id=BOT_ID), text="Child activity.", ) connector = ConnectorClient(self.credentials, base_url=SERVICE_URL) response = self.loop.run_until_complete( connector.conversations.send_to_conversation(CONVERSATION_ID, activity) ) activity_id = response.id response = self.loop.run_until_complete( connector.conversations.reply_to_activity( CONVERSATION_ID, activity_id, child_activity ) ) assert response is not None assert response.id != activity_id def test_conversations_reply_to_activity_with_invalid_conversation_id_fails(self): child_activity = Activity( type=ActivityTypes.message, channel_id=CHANNEL_ID, recipient=ChannelAccount(id=RECIPIENT_ID), from_property=ChannelAccount(id=BOT_ID), text="Child activity.", ) with pytest.raises(ErrorResponseException) as excinfo: connector = ConnectorClient(self.credentials, base_url=SERVICE_URL) self.loop.run_until_complete( connector.conversations.reply_to_activity( "INVALID_ID", "INVALID_ID", child_activity ) ) assert excinfo.value.error.error.code == "ServiceError" assert "Invalid ConversationId" in str(excinfo.value.error.error.message) def test_conversations_delete_activity(self): activity = Activity( type=ActivityTypes.message, channel_id=CHANNEL_ID, recipient=ChannelAccount(id=RECIPIENT_ID), from_property=ChannelAccount(id=BOT_ID), text="Activity to be deleted..", ) connector = ConnectorClient(self.credentials, base_url=SERVICE_URL) response = self.loop.run_until_complete( connector.conversations.send_to_conversation(CONVERSATION_ID, activity) ) activity_id = response.id response = self.loop.run_until_complete( connector.conversations.delete_activity(CONVERSATION_ID, activity_id) ) assert response is None def test_conversations_delete_activity_with_invalid_conversation_id_fails(self): with pytest.raises(ErrorResponseException) as excinfo: connector = ConnectorClient(self.credentials, base_url=SERVICE_URL) self.loop.run_until_complete( connector.conversations.delete_activity("INVALID_ID", "INVALID_ID") ) assert excinfo.value.error.error.code == "ServiceError" assert "Invalid ConversationId" in str(excinfo.value.error.error.message) def test_conversations_get_activity_members(self): activity = Activity( type=ActivityTypes.message, channel_id=CHANNEL_ID, recipient=ChannelAccount(id=RECIPIENT_ID), from_property=ChannelAccount(id=BOT_ID), text="Test Activity", ) connector = ConnectorClient(self.credentials, base_url=SERVICE_URL) response = self.loop.run_until_complete( connector.conversations.send_to_conversation(CONVERSATION_ID, activity) ) members = self.loop.run_until_complete( connector.conversations.get_activity_members(CONVERSATION_ID, response.id) ) assert len(members) == 2 assert members[0].name == BOT_NAME assert members[0].id == BOT_ID def test_conversations_get_activity_members_invalid_conversation_id_fails(self): activity = Activity( type=ActivityTypes.message, channel_id=CHANNEL_ID, recipient=ChannelAccount(id=RECIPIENT_ID), from_property=ChannelAccount(id=BOT_ID), text="Test Activity", ) with pytest.raises(ErrorResponseException) as excinfo: connector = ConnectorClient(self.credentials, base_url=SERVICE_URL) response = self.loop.run_until_complete( connector.conversations.send_to_conversation(CONVERSATION_ID, activity) ) self.loop.run_until_complete( connector.conversations.get_activity_members("INVALID_ID", response.id) ) assert excinfo.value.error.error.code == "ServiceError" assert "Invalid ConversationId" in str(excinfo.value.error.error.message)