import streamlit as st import pandas as pd import pandas as pd from tqdm import tqdm import pinecone import torch from sentence_transformers import SentenceTransformer from transformers import ( pipeline, AutoTokenizer, AutoModelForCausalLM, AutoModelForSeq2SeqLM, ) import openai import streamlit_scrollable_textbox as stx @st.experimental_singleton def get_data(): data = pd.read_csv("earnings_calls_cleaned_metadata.csv") return data # Initialize models from HuggingFace @st.experimental_singleton def get_t5_model(): return pipeline("summarization", model="t5-small", tokenizer="t5-small") @st.experimental_singleton def get_flan_t5_model(): return pipeline( "summarization", model="google/flan-t5-small", tokenizer="google/flan-t5-small" ) def get_gptj(): return pipeline( "summarization", model="EleutherAI/gpt-j-6B", tokenizer="EleutherAI/gpt-j-6B" ) @st.experimental_singleton def get_mpnet_embedding_model(): device = "cuda" if torch.cuda.is_available() else "cpu" model = SentenceTransformer( "sentence-transformers/all-mpnet-base-v2", device=device ) model.max_seq_length = 512 return model @st.experimental_singleton def get_sgpt_embedding_model(): device = "cuda" if torch.cuda.is_available() else "cpu" model = SentenceTransformer( "Muennighoff/SGPT-125M-weightedmean-nli-bitfit", device=device ) model.max_seq_length = 512 return model @st.experimental_memo def save_key(api_key): return api_key def query_pinecone( query, top_k, model, index, year, quarter, ticker, participant_type, threshold=0.25 ): if participant_type == "Company Speaker": participant = "Answer" else: participant = "Question" # generate embeddings for the query xq = model.encode([query]).tolist() if year == "All": if quarter == "All": xc = index.query( xq, top_k=top_k, filter={ "Year": { "$in": [ int("2020"), int("2019"), int("2018"), int("2017"), int("2016"), ] }, "Quarter": {"$in": ["Q1", "Q2", "Q3", "Q4"]}, "Ticker": {"$eq": ticker}, "QA_Flag": {"$eq": participant}, }, include_metadata=True, ) else: xc = index.query( xq, top_k=top_k, filter={ "Year": { "$in": [ int("2020"), int("2019"), int("2018"), int("2017"), int("2016"), ] }, "Quarter": {"$eq": quarter}, "Ticker": {"$eq": ticker}, "QA_Flag": {"$eq": participant}, }, include_metadata=True, ) else: # search pinecone index for context passage with the answer xc = index.query( xq, top_k=top_k, filter={ "Year": int(year), "Quarter": {"$eq": quarter}, "Ticker": {"$eq": ticker}, "QA_Flag": {"$eq": participant}, }, include_metadata=True, ) # filter the context passages based on the score threshold filtered_matches = [] for match in xc["matches"]: if match["score"] >= threshold: filtered_matches.append(match) xc["matches"] = filtered_matches return xc def format_query(query_results): # extract passage_text from Pinecone search result context = [result["metadata"]["Text"] for result in query_results["matches"]] return context def sentence_id_combine(data, query_results, lag=1): # Extract sentence IDs from query results ids = [result["metadata"]["Sentence_id"] for result in query_results["matches"]] # Generate new IDs by adding a lag value to the original IDs new_ids = [id + i for id in ids for i in range(-lag, lag + 1)] # Remove duplicates and sort the new IDs new_ids = sorted(set(new_ids)) # Create a list of lookup IDs by grouping the new IDs in groups of lag*2+1 lookup_ids = [ new_ids[i : i + (lag * 2 + 1)] for i in range(0, len(new_ids), lag * 2 + 1) ] # Create a list of context sentences by joining the sentences corresponding to the lookup IDs context_list = [ " ".join(data.Text.iloc[lookup_id].to_list()) for lookup_id in lookup_ids ] return context_list def text_lookup(data, sentence_ids): context = ". ".join(data.iloc[sentence_ids].to_list()) return context def generate_prompt(query_text, context_list): context = " ".join(context_list) prompt = f"""Answer the question in 6 long detailed points as accurately as possible using the provided context. Include as many key details as possible. Context: {context} Question: {query_text} Answer:""" return prompt def generate_prompt_2(query_text, context_list): context = " ".join(context_list) prompt = f""" Context information is below: --------------------- {context} --------------------- Given the context information and prior knowledge, answer this question: {query_text} Try to include as many key details as possible and format the answer in points.""" return prompt def gpt_model(prompt): response = openai.Completion.create( model="text-davinci-003", prompt=prompt, temperature=0.1, max_tokens=1024, top_p=1.0, frequency_penalty=0.5, presence_penalty=1, ) return response.choices[0].text # Transcript Retrieval def retrieve_transcript(data, year, quarter, ticker): if year == "All" or quarter == "All": row = ( data.loc[ (data.Ticker == ticker), ["File_Name"], ] .drop_duplicates() .iloc[0, 0] ) else: row = ( data.loc[ (data.Year == int(year)) & (data.Quarter == quarter) & (data.Ticker == ticker), ["File_Name"], ] .drop_duplicates() .iloc[0, 0] ) # convert row to a string and join values with "-" # row_str = "-".join(row.astype(str)) + ".txt" open_file = open( f"Transcripts/{ticker}/{row}", "r", ) file_text = open_file.read() return file_text