COPILOT_SECRET_1 = """Widn_AcNmY4.ElQeiczfsgx3qm71vffR05XSNtcoYVttUFCDpQyc6pY""" COPILOT_SECRET_2 = """Widn_AcNmY4.3iJ7N6y6PkFy98kzJnGwWgMTk3cTv2XDzxB8ncc-SRw""" COPILOT_NAME = "Dev Bot 1" BASE_URL = "https://europe.directline.botframework.com" # API reference - Direct Line API 3.0 ### https://learn.microsoft.com/en-us/azure/bot-service/rest-api/bot-framework-rest-direct-line-3-0-api-reference # API reference for the Bot Framework Connector service ### https://learn.microsoft.com/en-us/azure/bot-service/rest-api/bot-framework-rest-connector-api-reference # issue (Copilot Studio) with "IntegratedAuthenticationNotSupportedInChannel" ### https://community.powerplatform.com/forums/thread/details/?threadid=4be925f8-2ae0-4923-966c-3b8551db7748 # - bot has to be published. import requests import os import json import logging,sys class connector: def __init__(self): self.token=None self.conversationId=None self.transcript=[] def generate(self): print("GENERATE\n") logging.info('GENERATE\n') URL = os.path.join(BASE_URL,"v3/directline/tokens/generate") # Set the headers for the request headers = { 'Authorization': 'Bearer ' + COPILOT_SECRET_1, 'Content-Type': 'application/json' } # Send the request to the Direct Line API response = requests.post(URL, headers=headers) if response.status_code == 200: content=json.loads(response.content) self.conversationId=content['conversationId'], self.token=content['token'] # print(content) return content else: raise ConnectionError def start_conversation(self,token): print("\nSTART CONVERSATION\n") URL = "https://europe.directline.botframework.com/v3/directline/conversations" headers = { 'Authorization': 'Bearer ' + token, 'Content-Type': 'application/json' } response = requests.post(URL, headers=headers) if response.status_code == 200 or response.status_code == 201: content = json.loads(response.content) print(content) return content def refresh(self,token,conversationId): print("\nREFRESH\n") URL="https://europe.directline.botframework.com/v3/directline/tokens/refresh" # Set the headers for the request headers = { 'Authorization': 'Bearer ' + token, 'Content-Type': 'application/json' } # Set the parameters for the request data = {'conversationId':conversationId} response = requests.post(URL,json=data ,headers=headers) if response.status_code == 200: content=json.loads(response.content) print(content) return content else: print(response.status_code) def send_message(self,token,convrsationId,userId,message): print("\nSEND\n") self.transcript.append(userId + ": " + message) URL = f'https://europe.directline.botframework.com/v3/directline/conversations/{convrsationId}/activities' # Set the headers for the request headers = { 'Authorization': 'Bearer ' + token, 'Content-Type': 'application/json' } # Set the parameters for the request data = { 'type': 'message', 'from': {'id': f'{userId}'}, 'text': f'{message}'} # Send the request to the Direct Line API response = requests.post(URL, json=data, headers=headers) # Print the response status code print(response.status_code) print(response.json()) def get_message(self,token,conversationId,watermark_value=0): print("\nFETCH\n") URL = f'https://europe.directline.botframework.com/v3/directline/conversations/{conversationId}/activities' # Set the headers for the request headers = { 'Authorization': 'Bearer ' + token, 'Content-Type': 'application/json' } # Send the request to the Direct Line API response = requests.get(URL, headers=headers) # Print the response status code print(response.status_code) # print(response.content) content = json.loads(response.content) try: self.transcript.append(content["activities"][-1]['from']['name']+': ' + content["activities"][-1]['text']) except Exception: pass return content if __name__== '__main__': conn=connector() info=conn.generate() info=conn.start_conversation(info['token']) conn.send_message(info['token'],info['conversationId'],'cohitai','WASSUP!') conn.get_message(info['token'],info['conversationId']) # conn.refresh(info['token'],info["conversationId"])