import streamlit as st from keybert import KeyBERT import nltk from nltk.corpus import stopwords from transformers import AutoTokenizer import os, re, json import openai import spacy import en_core_web_sm from sklearn.cluster import KMeans, AgglomerativeClustering import numpy as np from sentence_transformers import SentenceTransformer MODEL = 'all-MiniLM-L6-v2' nltk.download('stopwords') @st.cache_data def load_autotoken(): autotok = AutoTokenizer.from_pretrained('facebook/bart-large-mnli') return autotok @st.cache_data def load_keyword_model(): sentence_model = load_model() kw_model = KeyBERT(model=sentence_model) return kw_model @st.cache_data def load_model(): embedder = SentenceTransformer(MODEL) return embedder @st.cache_data def load_nlp(): nlp = en_core_web_sm.load() return nlp def create_nest_sentences(document:str, token_max_length = 1023): nested = [] sent = [] length = 0 tokenizer = load_autotoken() for sentence in re.split(r'(?<=[^A-Z].[.?]) +(?=[A-Z])', document.replace("\n", '.')): tokens_in_sentence = tokenizer(str(sentence), truncation=False, padding=False)[0] # hugging face transformer tokenizer length += len(tokens_in_sentence) if length < token_max_length: sent.append(sentence) else: nested.append(sent) sent = [sentence] length = 0 if sent: nested.append(sent) return nested def preprocess(text) -> str: stop_words = set(stopwords.words("english")) text = text.lower() # text = ''.join([c for c in text if c not in ('!', '.', ',', '?', ':', ';', '"', "'", '-', '(', ')')]) words = text.split() words = [w for w in words if not w in stop_words] return " ".join(words) def generate_keywords(kw_model, document: str) -> list: atomic_extractions = kw_model.extract_keywords(document, keyphrase_ngram_range=(1, 1), stop_words=None, use_maxsum=True, nr_candidates=20, top_n=10) complex_extractions = kw_model.extract_keywords(document, keyphrase_ngram_range=(1, 2), stop_words=None, use_maxsum=True, nr_candidates=20, top_n=10) final_topics = [] for extraction in atomic_extractions: final_topics.append(extraction[0]) for extraction in complex_extractions: final_topics.append(extraction[0]) return final_topics def cluster_based_on_topics(nlp, embedder, text1:str, text2:str, num_clusters=3): # Preprocess and tokenize the texts doc1 = nlp(preprocess(text1)) doc2 = nlp(preprocess(text2)) # Extract sentences from the texts sentences1 = [sent.text for sent in doc1.sents] sentences2 = [sent.text for sent in doc2.sents] all_sentences = sentences1 + sentences2 # Generate sentence embeddings for each sentence sentence_embeddings1 = embedder.encode(sentences1) sentence_embeddings2 = embedder.encode(sentences2) all_embeddings = np.concatenate((sentence_embeddings1, sentence_embeddings2), axis=0) # Normalize the embeddings to unit length # all_embeddings = all_embeddings / np.linalg.norm(all_embeddings, axis=1, keepdims=True) # Perform agglomerative clustering clustering_model = KMeans(n_clusters=num_clusters) clustering_model.fit(all_embeddings) cluster_assignment = clustering_model.labels_ clustered_sentences = {} for sentence_id, cluster_id in enumerate(cluster_assignment): if cluster_id not in clustered_sentences: clustered_sentences[cluster_id] = [] clustered_sentences[cluster_id].append(all_sentences[sentence_id]) return clustered_sentences def generate_insights(topics:dict, name1:str, name2:str, text1:str, text2:str, clusters) -> list: openai.api_key = os.getenv("OPENAI_API_KEY") PROMPT = open("insights.prompt", "r").read() # print(topics) PROMPT = PROMPT.replace("{{name1}}", name1) PROMPT = PROMPT.replace("{{name2}}", name2) PROMPT = PROMPT.replace("{{topic1}}", ",".join(topics['insight1'][0])) PROMPT = PROMPT.replace("{{topic2}}", ",".join(topics['insight2'][0])) PROMPT = PROMPT.replace("{{complex1}}", ",".join(topics['insight1'][1])) PROMPT = PROMPT.replace("{{complex2}}", ",".join(topics['insight2'][1])) final_insights = [] for cluster_id, sentences in clusters.items(): # print(cluster_id, " ", sentences) final_sentences = "\n".join(sentences)[:4000] final_prompt = PROMPT.replace("{{sentences}}", final_sentences) # with open(f"prompter/insights_{cluster_id}.prompt", "w") as f: # f.write(final_prompt) # Generate insights for each cluster response = openai.Completion.create( model="text-davinci-003", prompt=final_prompt, max_tokens=200, temperature=0.7, top_p=1, frequency_penalty=0.0, presence_penalty=0.0, ) text = response['choices'][0]['text'] jsonify = json.loads(text) final_insights.append(jsonify) return final_insights