Spaces:
Build error
Build error
import streamlit as st | |
import pandas as pd | |
import pandas as pd | |
from tqdm import tqdm | |
import pinecone | |
import torch | |
from sentence_transformers import SentenceTransformer | |
from transformers import ( | |
pipeline, | |
AutoTokenizer, | |
AutoModelForCausalLM, | |
AutoModelForSeq2SeqLM, | |
) | |
import openai | |
import streamlit_scrollable_textbox as stx | |
def get_data(): | |
data = pd.read_csv("earnings_calls_cleaned_metadata.csv") | |
return data | |
# Initialize models from HuggingFace | |
def get_t5_model(): | |
return pipeline("summarization", model="t5-small", tokenizer="t5-small") | |
def get_flan_t5_model(): | |
return pipeline( | |
"summarization", model="google/flan-t5-small", tokenizer="google/flan-t5-small" | |
) | |
def get_gptj(): | |
return pipeline( | |
"summarization", model="EleutherAI/gpt-j-6B", tokenizer="EleutherAI/gpt-j-6B" | |
) | |
def get_mpnet_embedding_model(): | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
model = SentenceTransformer( | |
"sentence-transformers/all-mpnet-base-v2", device=device | |
) | |
model.max_seq_length = 512 | |
return model | |
def get_sgpt_embedding_model(): | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
model = SentenceTransformer( | |
"Muennighoff/SGPT-125M-weightedmean-nli-bitfit", device=device | |
) | |
model.max_seq_length = 512 | |
return model | |
def save_key(api_key): | |
return api_key | |
def query_pinecone( | |
query, top_k, model, index, year, quarter, ticker, participant_type, threshold=0.25 | |
): | |
if participant_type == "Company Speaker": | |
participant = "Answer" | |
else: | |
participant = "Question" | |
# generate embeddings for the query | |
xq = model.encode([query]).tolist() | |
if year == "All": | |
if quarter == "All": | |
xc = index.query( | |
xq, | |
top_k=top_k, | |
filter={ | |
"Year": { | |
"$in": [ | |
int("2020"), | |
int("2019"), | |
int("2018"), | |
int("2017"), | |
int("2016"), | |
] | |
}, | |
"Quarter": {"$in": ["Q1", "Q2", "Q3", "Q4"]}, | |
"Ticker": {"$eq": ticker}, | |
"QA_Flag": {"$eq": participant}, | |
}, | |
include_metadata=True, | |
) | |
else: | |
xc = index.query( | |
xq, | |
top_k=top_k, | |
filter={ | |
"Year": { | |
"$in": [ | |
int("2020"), | |
int("2019"), | |
int("2018"), | |
int("2017"), | |
int("2016"), | |
] | |
}, | |
"Quarter": {"$eq": quarter}, | |
"Ticker": {"$eq": ticker}, | |
"QA_Flag": {"$eq": participant}, | |
}, | |
include_metadata=True, | |
) | |
else: | |
# search pinecone index for context passage with the answer | |
xc = index.query( | |
xq, | |
top_k=top_k, | |
filter={ | |
"Year": int(year), | |
"Quarter": {"$eq": quarter}, | |
"Ticker": {"$eq": ticker}, | |
"QA_Flag": {"$eq": participant}, | |
}, | |
include_metadata=True, | |
) | |
# filter the context passages based on the score threshold | |
filtered_matches = [] | |
for match in xc["matches"]: | |
if match["score"] >= threshold: | |
filtered_matches.append(match) | |
xc["matches"] = filtered_matches | |
return xc | |
def format_query(query_results): | |
# extract passage_text from Pinecone search result | |
context = [result["metadata"]["Text"] for result in query_results["matches"]] | |
return context | |
def sentence_id_combine(data, query_results, lag=1): | |
# Extract sentence IDs from query results | |
ids = [result["metadata"]["Sentence_id"] for result in query_results["matches"]] | |
# Generate new IDs by adding a lag value to the original IDs | |
new_ids = [id + i for id in ids for i in range(-lag, lag + 1)] | |
# Remove duplicates and sort the new IDs | |
new_ids = sorted(set(new_ids)) | |
# Create a list of lookup IDs by grouping the new IDs in groups of lag*2+1 | |
lookup_ids = [ | |
new_ids[i : i + (lag * 2 + 1)] for i in range(0, len(new_ids), lag * 2 + 1) | |
] | |
# Create a list of context sentences by joining the sentences corresponding to the lookup IDs | |
context_list = [ | |
" ".join(data.Text.iloc[lookup_id].to_list()) for lookup_id in lookup_ids | |
] | |
return context_list | |
def text_lookup(data, sentence_ids): | |
context = ". ".join(data.iloc[sentence_ids].to_list()) | |
return context | |
def generate_prompt(query_text, context_list): | |
context = " ".join(context_list) | |
prompt = f"""Answer the question in 6 long detailed points as accurately as possible using the provided context. Include as many key details as possible. | |
Context: {context} | |
Question: {query_text} | |
Answer:""" | |
return prompt | |
def generate_prompt_2(query_text, context_list): | |
context = " ".join(context_list) | |
prompt = f""" | |
Context information is below: | |
--------------------- | |
{context} | |
--------------------- | |
Given the context information and prior knowledge, answer this question: | |
{query_text} | |
Try to include as many key details as possible and format the answer in points.""" | |
return prompt | |
def gpt_model(prompt): | |
response = openai.Completion.create( | |
model="text-davinci-003", | |
prompt=prompt, | |
temperature=0.1, | |
max_tokens=1024, | |
top_p=1.0, | |
frequency_penalty=0.5, | |
presence_penalty=1, | |
) | |
return response.choices[0].text | |
# Transcript Retrieval | |
def retrieve_transcript(data, year, quarter, ticker): | |
if year == "All" or quarter == "All": | |
row = ( | |
data.loc[ | |
(data.Ticker == ticker), | |
["File_Name"], | |
] | |
.drop_duplicates() | |
.iloc[0, 0] | |
) | |
else: | |
row = ( | |
data.loc[ | |
(data.Year == int(year)) | |
& (data.Quarter == quarter) | |
& (data.Ticker == ticker), | |
["File_Name"], | |
] | |
.drop_duplicates() | |
.iloc[0, 0] | |
) | |
# convert row to a string and join values with "-" | |
# row_str = "-".join(row.astype(str)) + ".txt" | |
open_file = open( | |
f"Transcripts/{ticker}/{row}", | |
"r", | |
) | |
file_text = open_file.read() | |
return file_text | |