import requests import json import re from urllib.parse import quote def extract_between_tags(text, start_tag, end_tag): start_index = text.find(start_tag) end_index = text.find(end_tag, start_index) return text[start_index+len(start_tag):end_index-len(end_tag)] class VectaraQuery(): def __init__(self, api_key: str, customer_id: str, corpus_id: str, prompt_name: str = None): self.customer_id = customer_id self.corpus_id = corpus_id self.api_key = api_key self.prompt_name = prompt_name if prompt_name else "vectara-experimental-summary-ext-2023-12-11-large" self.conv_id = None def get_body(self, user_response: str, role: str, topic: str): corpora_key_list = [{ 'customer_id': self.customer_id, 'corpus_id': self.corpus_id, 'lexical_interpolation_config': {'lambda': 0.025} }] prompt = f''' [ {{ "role": "system", "content": "You are a professional debate bot. You are provided with search results related to {topic} and respond to the previous arugments made so far. Be sure to provide a thoughtful and convincing reply. Never mention search results explicitly in your response. Do not base your response on information or knowledge that is not in the search results. Respond while demonstrating respect to the other party and the topic. Limit your responses to not more than 3 paragraphs." }}, {{ "role": "user", "content": " #foreach ($qResult in $vectaraQueryResults) Search result $esc.java(${{foreach.index}}+1): $esc.java(${{qResult.getText()}}) #end " }}, {{ "role": "user", "content": "provide a convincing reply {role} {topic}. Consider the search results as relevant information with which to form your response. Do not repeat earlier arguments and make sure your new response is coherent with the previous arguments, and responsive to the last argument: {user_response}." }} ] ''' return { 'query': [ { 'query': "how would you respond?", 'start': 0, 'numResults': 50, 'corpusKey': corpora_key_list, 'context_config': { 'sentences_before': 2, 'sentences_after': 2, 'start_tag': "%START_SNIPPET%", 'end_tag': "%END_SNIPPET%", }, 'rerankingConfig': { 'rerankerId': 272725718, 'mmrConfig': { 'diversityBias': 0.3 } }, 'summary': [ { 'responseLang': 'eng', 'maxSummarizedResults': 7, 'summarizerPromptName': self.prompt_name, 'promptText': prompt, 'chat': { 'store': True, 'conversationId': self.conv_id }, } ] } ] } def get_headers(self): return { "Content-Type": "application/json", "Accept": "application/json", "customer-id": self.customer_id, "x-api-key": self.api_key, "grpc-timeout": "60S" } def submit_query(self, query_str: str, role: str, topic: str): endpoint = f"https://api.vectara.io/v1/stream-query" body = self.get_body(query_str, role, topic) response = requests.post(endpoint, data=json.dumps(body), verify=True, headers=self.get_headers(), stream=True) if response.status_code != 200: print(f"Query failed with code {response.status_code}, reason {response.reason}, text {response.text}") return "Sorry, something went wrong in my brain. Please try again later." chunks = [] accumulated_text = "" # Initialize text accumulation pattern_max_length = 50 # Example heuristic for line in response.iter_lines(): if line: # filter out keep-alive new lines data = json.loads(line.decode('utf-8')) res = data['result'] response_set = res['responseSet'] if response_set is None: # grab next chunk and yield it as output summary = res.get('summary', None) if summary is None or len(summary)==0: continue else: chat = summary.get('chat', None) if chat and chat.get('status', None): st_code = chat['status'] print(f"Chat query failed with code {st_code}") if st_code == 'RESOURCE_EXHAUSTED': self.conv_id = None return 'Sorry, Vectara chat turns exceeds plan limit.' return 'Sorry, something went wrong in my brain. Please try again later.' conv_id = chat.get('conversationId', None) if chat else None if conv_id: self.conv_id = conv_id chunk = summary['text'] accumulated_text += chunk # Append current chunk to accumulation if len(accumulated_text) > pattern_max_length: accumulated_text = re.sub(r"\[\d+\]", "", accumulated_text) accumulated_text = re.sub(r"\s+\.", ".", accumulated_text) out_chunk = accumulated_text[:-pattern_max_length] chunks.append(out_chunk) yield out_chunk accumulated_text = accumulated_text[-pattern_max_length:] if summary['done']: break # yield the last piece if len(accumulated_text) > 0: accumulated_text = re.sub(r" \[\d+\]\.", ".", accumulated_text) chunks.append(accumulated_text) yield accumulated_text return ''.join(chunks)