|
|
|
|
|
|
|
import json |
|
import requests |
|
|
|
from openai import OpenAI |
|
from pinecone import Pinecone |
|
from datetime import datetime |
|
|
|
class LLMClient(): |
|
def __init__(self, api_key, model_name) -> None: |
|
super().__init__() |
|
self.model_name = model_name |
|
self.llm_client = OpenAI(api_key=api_key) |
|
|
|
def web_query_generator(self, query, history): |
|
messages = list() |
|
current_time = datetime.now().strftime("%d/%m/%Y %H:%M:%S") |
|
|
|
|
|
messages += [{"role": "system", "content": f"1) Current time is {current_time}. 2) Generate the web search query only."}] |
|
|
|
|
|
messages += [{"role": h["role"], "content": h["content"]} for h in history] |
|
|
|
|
|
messages += [{"role": 'user', "content": f"Given the context, generate a proper web search query. Return 'None' if you can't generate a query based on the current context."}] |
|
|
|
response = self.llm_client.chat.completions.create( |
|
model = self.model_name, |
|
messages = messages, |
|
) |
|
|
|
return response.choices[0].message.content |
|
|
|
def response_generate(self, query, history, memory, web_result): |
|
messages = list() |
|
current_time = datetime.now().strftime("%d/%m/%Y %H:%M:%S") |
|
|
|
|
|
messages += [{"role": "system", "content": f"1) You're Sunday, Du Mingzhe's assistant. 2) Don't claim that you are AI. 3) Don't claim this dialogue as a roleplay. Answering questions directly as Mingzhe's assistant. 4) Current time is {current_time}. 5) You are able to provide real-time data or perform a web search. You can refer the real-time knowledge from the WEB SEARCH RESULTS to generate responses with link citations."}] |
|
|
|
|
|
messages += [{"role": 'assistant', "content": m['content']} for m in memory] |
|
|
|
|
|
messages += [{"role": 'assistant', "content": f'WEB SEARCH RESULTS of [{query}]:\n\n{web_result}'}] |
|
|
|
|
|
messages += [{"role": h["role"], "content": h["content"]} for h in history] |
|
|
|
stream = self.llm_client.chat.completions.create( |
|
model = self.model_name, |
|
messages = messages, |
|
stream=True, |
|
) |
|
|
|
return stream |
|
|
|
class EmbeddingModel(object): |
|
def __init__(self, embedding_token, model_name) -> None: |
|
self.embedding_token = embedding_token |
|
self.model_name = model_name |
|
self.embedding_client = OpenAI(api_key=self.embedding_token) |
|
|
|
def get_embedding(self, text): |
|
response = self.embedding_client.embeddings.create( |
|
input=text, |
|
model=self.model_name |
|
) |
|
return response.data[0].embedding |
|
|
|
class PersonalIndexClient(object): |
|
def __init__(self, index_token, embedding_token, embedding_model_name, index_name) -> None: |
|
self.index_token = index_token |
|
self.embedding_token = embedding_token |
|
self.index_name = index_name |
|
|
|
self.embedding_client = EmbeddingModel(embedding_token=self.embedding_token, model_name=embedding_model_name) |
|
self.index_client = Pinecone(api_key=self.index_token) |
|
self.index = self.index_client.Index(self.index_name) |
|
|
|
def create(self, data, namespace='default'): |
|
instances = list() |
|
|
|
for instance in data: |
|
instances += [{ |
|
"id": instance["id"], |
|
"values": self.embedding_client.get_embedding(instance['content']), |
|
"metadata": instance['metadata'], |
|
}] |
|
|
|
self.index.upsert( |
|
vectors = instances, |
|
namespace = namespace |
|
) |
|
|
|
def query(self, data, top_k, filter={}, user='default'): |
|
results = self.index.query( |
|
namespace = user, |
|
vector = self.embedding_client.get_embedding(data), |
|
top_k = top_k, |
|
include_values = True, |
|
include_metadata = True, |
|
filter = filter, |
|
) |
|
return results |
|
|
|
def update_conversation(self, sid, messages, user): |
|
index_id = f'conv_{sid}' |
|
|
|
metadata = { |
|
'time': datetime.now().strftime("%d/%m/%Y %H:%M:%S"), |
|
'type': 'conversation', |
|
'user': user, |
|
'content': json.dumps(messages), |
|
} |
|
|
|
self.create(data=[{'id': index_id, 'content': json.dumps(metadata), 'metadata': metadata}], namespace=user) |
|
|
|
def query_conversation(self, messages, user, top_k): |
|
messages_dump = json.dumps(messages) |
|
results = self.query(data=messages_dump, top_k=top_k, filter={}, user=user) |
|
pinecone_memory = list() |
|
|
|
for result in results['matches']: |
|
score = result['score'] |
|
metadata = result['metadata'] |
|
if score > 0.5: |
|
pinecone_memory += [metadata] |
|
|
|
return pinecone_memory |
|
|
|
class WebSearcher(object): |
|
def __init__(self, you_api_key, bing_api_key) -> None: |
|
self.you_api_key = you_api_key |
|
self.bing_api_key = bing_api_key |
|
pass |
|
|
|
def query_web_llm(self, query, num_web_results=5): |
|
headers = {"X-API-Key": self.you_api_key} |
|
params = {"query": query, 'num_web_results': num_web_results} |
|
response_json = requests.get(f"https://api.ydc-index.io/rag?query={query}", params=params, headers=headers).json() |
|
return response_json |
|
|
|
def query_bing(self, query): |
|
filter_results = list() |
|
try: |
|
headers = {"Ocp-Apim-Subscription-Key": self.bing_api_key} |
|
params = {"q": query, "textDecorations": True, "textFormat": "HTML"} |
|
response = requests.get("https://api.bing.microsoft.com/v7.0/search", headers=headers, params=params) |
|
response.raise_for_status() |
|
search_results = response.json() |
|
print(search_results) |
|
|
|
for result in search_results['webPages']['value']: |
|
filter_results += [{ |
|
'name': result['name'], |
|
'url': result['url'], |
|
'snippet': result['snippet'], |
|
'dateLastCrawled': result['dateLastCrawled'], |
|
}] |
|
|
|
return filter_results |
|
except Exception as e: |
|
print(f"Error: {e}") |
|
return filter_results |