import requests import json import re from urllib.parse import quote def extract_between_tags(text, start_tag, end_tag): start_index = text.find(start_tag) end_index = text.find(end_tag, start_index) return text[start_index+len(start_tag):end_index-len(end_tag)] class CitationNormalizer(): def __init__(self, responses, docs): self.docs = docs self.responses = responses self.refs = [] def normalize_citations(self, summary): start_tag = "%START_SNIPPET%" end_tag = "%END_SNIPPET%" # find all references in the summary pattern = r'\[\d{1,2}\]' matches = [match.span() for match in re.finditer(pattern, summary)] # figure out unique list of references for match in matches: start, end = match response_num = int(summary[start+1:end-1]) doc_num = self.responses[response_num-1]['documentIndex'] metadata = {item['name']: item['value'] for item in self.docs[doc_num]['metadata']} text = extract_between_tags(self.responses[response_num-1]['text'], start_tag, end_tag) if 'url' in metadata.keys(): url = f"{metadata['url']}#:~:text={quote(text)}" if url not in self.refs: self.refs.append(url) # replace references with markdown links refs_dict = {url:(inx+1) for inx,url in enumerate(self.refs)} for match in reversed(matches): start, end = match response_num = int(summary[start+1:end-1]) doc_num = self.responses[response_num-1]['documentIndex'] metadata = {item['name']: item['value'] for item in self.docs[doc_num]['metadata']} text = extract_between_tags(self.responses[response_num-1]['text'], start_tag, end_tag) if 'url' in metadata.keys(): url = f"{metadata['url']}#:~:text={quote(text)}" citation_inx = refs_dict[url] summary = summary[:start] + f'[\[{citation_inx}\]]({url})' + summary[end:] else: summary = summary[:start] + summary[end:] return summary class VectaraQuery(): def __init__(self, api_key: str, customer_id: str, corpus_ids: list[str], prompt_name: str = None): self.customer_id = customer_id self.corpus_ids = corpus_ids self.api_key = api_key self.prompt_name = prompt_name if prompt_name else "vectara-experimental-summary-ext-2023-12-11-sml" self.conv_id = None def get_body(self, query_str: str): corpora_key_list = [{ 'customer_id': self.customer_id, 'corpus_id': corpus_id, 'lexical_interpolation_config': {'lambda': 0.025} } for corpus_id in self.corpus_ids ] return { 'query': [ { 'query': query_str, 'start': 0, 'numResults': 50, 'corpusKey': corpora_key_list, 'context_config': { 'sentences_before': 2, 'sentences_after': 2, 'start_tag': "%START_SNIPPET%", 'end_tag': "%END_SNIPPET%", }, 'rerankingConfig': { 'rerankerId': 272725718, 'mmrConfig': { 'diversityBias': 0.3 } }, 'summary': [ { 'responseLang': 'eng', 'maxSummarizedResults': 5, 'summarizerPromptName': self.prompt_name, 'chat': { 'store': True, 'conversationId': self.conv_id }, } ] } ] } def get_headers(self): return { "Content-Type": "application/json", "Accept": "application/json", "customer-id": self.customer_id, "x-api-key": self.api_key, "grpc-timeout": "60S" } def submit_query(self, query_str: str): endpoint = f"https://api.vectara.io/v1/query" body = self.get_body(query_str) response = requests.post(endpoint, data=json.dumps(body), verify=True, headers=self.get_headers()) if response.status_code != 200: print(f"Query failed with code {response.status_code}, reason {response.reason}, text {response.text}") return "Sorry, something went wrong in my brain. Please try again later." res = response.json() top_k = 10 summary = res['responseSet'][0]['summary'][0]['text'] responses = res['responseSet'][0]['response'][:top_k] docs = res['responseSet'][0]['document'] chat = res['responseSet'][0]['summary'][0].get('chat', None) if chat and chat['status'] is not None: st_code = chat['status'] print(f"Chat query failed with code {st_code}") if st_code == 'RESOURCE_EXHAUSTED': self.conv_id = None return 'Sorry, Vectara chat turns exceeds plan limit.' return 'Sorry, something went wrong in my brain. Please try again later.' self.conv_id = chat['conversationId'] if chat else None summary = CitationNormalizer(responses, docs).normalize_citations(summary) return summary def submit_query_streaming(self, query_str: str): endpoint = f"https://api.vectara.io/v1/stream-query" body = self.get_body(query_str) response = requests.post(endpoint, data=json.dumps(body), verify=True, headers=self.get_headers(), stream=True) if response.status_code != 200: print(f"Query failed with code {response.status_code}, reason {response.reason}, text {response.text}") return "Sorry, something went wrong in my brain. Please try again later." chunks = [] accumulated_text = "" # Initialize text accumulation pattern_max_length = 50 # Example heuristic for line in response.iter_lines(): if line: # filter out keep-alive new lines data = json.loads(line.decode('utf-8')) res = data['result'] response_set = res['responseSet'] if response_set is None: # grab next chunk and yield it as output summary = res.get('summary', None) if summary is None or len(summary)==0: continue else: chat = summary.get('chat', None) if chat and chat.get('status', None): st_code = chat['status'] print(f"Chat query failed with code {st_code}") if st_code == 'RESOURCE_EXHAUSTED': self.conv_id = None return 'Sorry, Vectara chat turns exceeds plan limit.' return 'Sorry, something went wrong in my brain. Please try again later.' conv_id = chat.get('conversationId', None) if chat else None if conv_id: self.conv_id = conv_id chunk = summary['text'] accumulated_text += chunk # Append current chunk to accumulation if len(accumulated_text) > pattern_max_length: accumulated_text = re.sub(r"\[\d+\]", "", accumulated_text) accumulated_text = re.sub(r"\s+\.", ".", accumulated_text) out_chunk = accumulated_text[:-pattern_max_length] chunks.append(out_chunk) yield out_chunk accumulated_text = accumulated_text[-pattern_max_length:] if summary['done']: break # yield the last piece if len(accumulated_text) > 0: accumulated_text = re.sub(r" \[\d+\]\.", ".", accumulated_text) chunks.append(accumulated_text) yield accumulated_text return ''.join(chunks)