Spaces:
Build error
Build error
File size: 5,014 Bytes
0827183 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
COPILOT_SECRET_1 = """Widn_AcNmY4.ElQeiczfsgx3qm71vffR05XSNtcoYVttUFCDpQyc6pY"""
COPILOT_SECRET_2 = """Widn_AcNmY4.3iJ7N6y6PkFy98kzJnGwWgMTk3cTv2XDzxB8ncc-SRw"""
COPILOT_NAME = "Dev Bot 1"
BASE_URL = "https://europe.directline.botframework.com"
# API reference - Direct Line API 3.0
### https://learn.microsoft.com/en-us/azure/bot-service/rest-api/bot-framework-rest-direct-line-3-0-api-reference
# API reference for the Bot Framework Connector service
### https://learn.microsoft.com/en-us/azure/bot-service/rest-api/bot-framework-rest-connector-api-reference
# issue (Copilot Studio) with "IntegratedAuthenticationNotSupportedInChannel"
### https://community.powerplatform.com/forums/thread/details/?threadid=4be925f8-2ae0-4923-966c-3b8551db7748
# - bot has to be published.
import requests
import os
import json
import logging,sys
class connector:
def __init__(self):
self.token=None
self.conversationId=None
self.transcript=[]
def generate(self):
print("GENERATE\n")
logging.info('GENERATE\n')
URL = os.path.join(BASE_URL,"v3/directline/tokens/generate")
# Set the headers for the request
headers = {
'Authorization': 'Bearer ' + COPILOT_SECRET_1,
'Content-Type': 'application/json'
}
# Send the request to the Direct Line API
response = requests.post(URL, headers=headers)
if response.status_code == 200:
content=json.loads(response.content)
self.conversationId=content['conversationId'],
self.token=content['token']
# print(content)
return content
else:
raise ConnectionError
def start_conversation(self,token):
print("\nSTART CONVERSATION\n")
URL = "https://europe.directline.botframework.com/v3/directline/conversations"
headers = {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/json'
}
response = requests.post(URL, headers=headers)
if response.status_code == 200 or response.status_code == 201:
content = json.loads(response.content)
print(content)
return content
def refresh(self,token,conversationId):
print("\nREFRESH\n")
URL="https://europe.directline.botframework.com/v3/directline/tokens/refresh"
# Set the headers for the request
headers = {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/json'
}
# Set the parameters for the request
data = {'conversationId':conversationId}
response = requests.post(URL,json=data ,headers=headers)
if response.status_code == 200:
content=json.loads(response.content)
print(content)
return content
else:
print(response.status_code)
def send_message(self,token,convrsationId,userId,message):
print("\nSEND\n")
self.transcript.append(userId + ": " + message)
URL = f'https://europe.directline.botframework.com/v3/directline/conversations/{convrsationId}/activities'
# Set the headers for the request
headers = {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/json'
}
# Set the parameters for the request
data = {
'type': 'message',
'from': {'id': f'{userId}'},
'text': f'{message}'}
# Send the request to the Direct Line API
response = requests.post(URL, json=data, headers=headers)
# Print the response status code
print(response.status_code)
print(response.json())
def get_message(self,token,conversationId,watermark_value=0):
print("\nFETCH\n")
URL = f'https://europe.directline.botframework.com/v3/directline/conversations/{conversationId}/activities'
# Set the headers for the request
headers = {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/json'
}
# Send the request to the Direct Line API
response = requests.get(URL, headers=headers)
# Print the response status code
print(response.status_code)
# print(response.content)
content = json.loads(response.content)
try:
self.transcript.append(content["activities"][-1]['from']['name']+': ' + content["activities"][-1]['text'])
except Exception:
pass
return content
if __name__== '__main__':
conn=connector()
info=conn.generate()
info=conn.start_conversation(info['token'])
conn.send_message(info['token'],info['conversationId'],'cohitai','WASSUP!')
conn.get_message(info['token'],info['conversationId'])
# conn.refresh(info['token'],info["conversationId"])
|