import os import re import shutil import urllib.request from pathlib import Path from tempfile import NamedTemporaryFile import fitz import numpy as np import openai import tensorflow_hub as hub from fastapi import UploadFile from lcserve import serving from sklearn.neighbors import NearestNeighbors recommender = None def download_pdf(url, output_path): urllib.request.urlretrieve(url, output_path) def preprocess(text): text = text.replace('\n', ' ') text = re.sub('\s+', ' ', text) return text def pdf_to_text(path, start_page=1, end_page=None): doc = fitz.open(path) total_pages = doc.page_count if end_page is None: end_page = total_pages text_list = [] for i in range(start_page - 1, end_page): text = doc.load_page(i).get_text("text") text = preprocess(text) text_list.append(text) doc.close() return text_list def text_to_chunks(texts, word_length=150, start_page=1): text_toks = [t.split(' ') for t in texts] chunks = [] for idx, words in enumerate(text_toks): for i in range(0, len(words), word_length): chunk = words[i : i + word_length] if ( (i + word_length) > len(words) and (len(chunk) < word_length) and (len(text_toks) != (idx + 1)) ): text_toks[idx + 1] = chunk + text_toks[idx + 1] continue chunk = ' '.join(chunk).strip() chunk = f'[Page no. {idx+start_page}]' + ' ' + '"' + chunk + '"' chunks.append(chunk) return chunks class SemanticSearch: def __init__(self): self.use = hub.load('https://tfhub.dev/google/universal-sentence-encoder/4') self.fitted = False def fit(self, data, batch=1000, n_neighbors=5): self.data = data self.embeddings = self.get_text_embedding(data, batch=batch) n_neighbors = min(n_neighbors, len(self.embeddings)) self.nn = NearestNeighbors(n_neighbors=n_neighbors) self.nn.fit(self.embeddings) self.fitted = True def __call__(self, text, return_data=True): inp_emb = self.use([text]) neighbors = self.nn.kneighbors(inp_emb, return_distance=False)[0] if return_data: return [self.data[i] for i in neighbors] else: return neighbors def get_text_embedding(self, texts, batch=1000): embeddings = [] for i in range(0, len(texts), batch): text_batch = texts[i : (i + batch)] emb_batch = self.use(text_batch) embeddings.append(emb_batch) embeddings = np.vstack(embeddings) return embeddings def load_recommender(path, start_page=1): global recommender if recommender is None: recommender = SemanticSearch() texts = pdf_to_text(path, start_page=start_page) chunks = text_to_chunks(texts, start_page=start_page) recommender.fit(chunks) return 'Corpus Loaded.' def generate_text(openAI_key, prompt, engine="text-davinci-003"): openai.api_key = openAI_key try: completions = openai.Completion.create( engine=engine, prompt=prompt, max_tokens=512, n=1, stop=None, temperature=0.7, ) message = completions.choices[0].text except Exception as e: message = f'API Error: {str(e)}' return message def generate_answer(question, openAI_key): topn_chunks = recommender(question) prompt = "" prompt += 'search results:\n\n' for c in topn_chunks: prompt += c + '\n\n' prompt += ( "Instructions: Compose a comprehensive reply to the query using the search results given. " "Cite each reference using [ Page Number] notation (every result has this number at the beginning). " "Citation should be done at the end of each sentence. If the search results mention multiple subjects " "with the same name, create separate answers for each. Only include information found in the results and " "don't add any additional information. Make sure the answer is correct and don't output false content. " "If the text does not relate to the query, simply state 'Text Not Found in PDF'. Ignore outlier " "search results which has nothing to do with the question. Only answer what is asked. The " "answer should be short and concise. Answer step-by-step. \n\nQuery: {question}\nAnswer: " ) prompt += f"Query: {question}\nAnswer:" answer = generate_text(openAI_key, prompt, "text-davinci-003") return answer def load_openai_key() -> str: key = os.environ.get("OPENAI_API_KEY") if key is None: raise ValueError( "[ERROR]: Please pass your OPENAI_API_KEY. Get your key here : https://platform.openai.com/account/api-keys" ) return key @serving def ask_url(url: str, question: str): download_pdf(url, 'corpus.pdf') load_recommender('corpus.pdf') openAI_key = load_openai_key() return generate_answer(question, openAI_key) @serving async def ask_file(file: UploadFile, question: str) -> str: suffix = Path(file.filename).suffix with NamedTemporaryFile(delete=False, suffix=suffix) as tmp: shutil.copyfileobj(file.file, tmp) tmp_path = Path(tmp.name) load_recommender(str(tmp_path)) openAI_key = load_openai_key() return generate_answer(question, openAI_key)