cohit's picture
Upload folder using huggingface_hub
0827183 verified
raw
history blame contribute delete
No virus
5.01 kB
COPILOT_SECRET_1 = """Widn_AcNmY4.ElQeiczfsgx3qm71vffR05XSNtcoYVttUFCDpQyc6pY"""
COPILOT_SECRET_2 = """Widn_AcNmY4.3iJ7N6y6PkFy98kzJnGwWgMTk3cTv2XDzxB8ncc-SRw"""
COPILOT_NAME = "Dev Bot 1"
BASE_URL = "https://europe.directline.botframework.com"
# API reference - Direct Line API 3.0
### https://learn.microsoft.com/en-us/azure/bot-service/rest-api/bot-framework-rest-direct-line-3-0-api-reference
# API reference for the Bot Framework Connector service
### https://learn.microsoft.com/en-us/azure/bot-service/rest-api/bot-framework-rest-connector-api-reference
# issue (Copilot Studio) with "IntegratedAuthenticationNotSupportedInChannel"
### https://community.powerplatform.com/forums/thread/details/?threadid=4be925f8-2ae0-4923-966c-3b8551db7748
# - bot has to be published.
import requests
import os
import json
import logging,sys
class connector:
def __init__(self):
self.token=None
self.conversationId=None
self.transcript=[]
def generate(self):
print("GENERATE\n")
logging.info('GENERATE\n')
URL = os.path.join(BASE_URL,"v3/directline/tokens/generate")
# Set the headers for the request
headers = {
'Authorization': 'Bearer ' + COPILOT_SECRET_1,
'Content-Type': 'application/json'
}
# Send the request to the Direct Line API
response = requests.post(URL, headers=headers)
if response.status_code == 200:
content=json.loads(response.content)
self.conversationId=content['conversationId'],
self.token=content['token']
# print(content)
return content
else:
raise ConnectionError
def start_conversation(self,token):
print("\nSTART CONVERSATION\n")
URL = "https://europe.directline.botframework.com/v3/directline/conversations"
headers = {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/json'
}
response = requests.post(URL, headers=headers)
if response.status_code == 200 or response.status_code == 201:
content = json.loads(response.content)
print(content)
return content
def refresh(self,token,conversationId):
print("\nREFRESH\n")
URL="https://europe.directline.botframework.com/v3/directline/tokens/refresh"
# Set the headers for the request
headers = {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/json'
}
# Set the parameters for the request
data = {'conversationId':conversationId}
response = requests.post(URL,json=data ,headers=headers)
if response.status_code == 200:
content=json.loads(response.content)
print(content)
return content
else:
print(response.status_code)
def send_message(self,token,convrsationId,userId,message):
print("\nSEND\n")
self.transcript.append(userId + ": " + message)
URL = f'https://europe.directline.botframework.com/v3/directline/conversations/{convrsationId}/activities'
# Set the headers for the request
headers = {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/json'
}
# Set the parameters for the request
data = {
'type': 'message',
'from': {'id': f'{userId}'},
'text': f'{message}'}
# Send the request to the Direct Line API
response = requests.post(URL, json=data, headers=headers)
# Print the response status code
print(response.status_code)
print(response.json())
def get_message(self,token,conversationId,watermark_value=0):
print("\nFETCH\n")
URL = f'https://europe.directline.botframework.com/v3/directline/conversations/{conversationId}/activities'
# Set the headers for the request
headers = {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/json'
}
# Send the request to the Direct Line API
response = requests.get(URL, headers=headers)
# Print the response status code
print(response.status_code)
# print(response.content)
content = json.loads(response.content)
try:
self.transcript.append(content["activities"][-1]['from']['name']+': ' + content["activities"][-1]['text'])
except Exception:
pass
return content
if __name__== '__main__':
conn=connector()
info=conn.generate()
info=conn.start_conversation(info['token'])
conn.send_message(info['token'],info['conversationId'],'cohitai','WASSUP!')
conn.get_message(info['token'],info['conversationId'])
# conn.refresh(info['token'],info["conversationId"])