Spaces:
Build error
Build error
import streamlit as st | |
from keybert import KeyBERT | |
import nltk | |
from nltk.corpus import stopwords | |
from transformers import AutoTokenizer | |
import os, re, json | |
import openai | |
import spacy | |
import en_core_web_sm | |
from sklearn.cluster import KMeans, AgglomerativeClustering | |
import numpy as np | |
from sentence_transformers import SentenceTransformer | |
MODEL = 'all-MiniLM-L6-v2' | |
nltk.download('stopwords') | |
def load_autotoken(): | |
autotok = AutoTokenizer.from_pretrained('facebook/bart-large-mnli') | |
return autotok | |
def load_keyword_model(): | |
sentence_model = load_model() | |
kw_model = KeyBERT(model=sentence_model) | |
return kw_model | |
def load_model(): | |
embedder = SentenceTransformer(MODEL) | |
return embedder | |
def load_nlp(): | |
nlp = en_core_web_sm.load() | |
return nlp | |
def create_nest_sentences(document:str, token_max_length = 1023): | |
nested = [] | |
sent = [] | |
length = 0 | |
tokenizer = load_autotoken() | |
for sentence in re.split(r'(?<=[^A-Z].[.?]) +(?=[A-Z])', document.replace("\n", '.')): | |
tokens_in_sentence = tokenizer(str(sentence), truncation=False, padding=False)[0] # hugging face transformer tokenizer | |
length += len(tokens_in_sentence) | |
if length < token_max_length: | |
sent.append(sentence) | |
else: | |
nested.append(sent) | |
sent = [sentence] | |
length = 0 | |
if sent: | |
nested.append(sent) | |
return nested | |
def preprocess(text) -> str: | |
stop_words = set(stopwords.words("english")) | |
text = text.lower() | |
# text = ''.join([c for c in text if c not in ('!', '.', ',', '?', ':', ';', '"', "'", '-', '(', ')')]) | |
words = text.split() | |
words = [w for w in words if not w in stop_words] | |
return " ".join(words) | |
def generate_keywords(kw_model, document: str) -> list: | |
atomic_extractions = kw_model.extract_keywords(document, keyphrase_ngram_range=(1, 1), stop_words=None, use_maxsum=True, nr_candidates=20, top_n=10) | |
complex_extractions = kw_model.extract_keywords(document, keyphrase_ngram_range=(1, 2), stop_words=None, use_maxsum=True, nr_candidates=20, top_n=10) | |
final_topics = [] | |
for extraction in atomic_extractions: | |
final_topics.append(extraction[0]) | |
for extraction in complex_extractions: | |
final_topics.append(extraction[0]) | |
return final_topics | |
def cluster_based_on_topics(nlp, embedder, text1:str, text2:str, num_clusters=3): | |
# Preprocess and tokenize the texts | |
doc1 = nlp(preprocess(text1)) | |
doc2 = nlp(preprocess(text2)) | |
# Extract sentences from the texts | |
sentences1 = [sent.text for sent in doc1.sents] | |
sentences2 = [sent.text for sent in doc2.sents] | |
all_sentences = sentences1 + sentences2 | |
# Generate sentence embeddings for each sentence | |
sentence_embeddings1 = embedder.encode(sentences1) | |
sentence_embeddings2 = embedder.encode(sentences2) | |
all_embeddings = np.concatenate((sentence_embeddings1, sentence_embeddings2), axis=0) | |
# Normalize the embeddings to unit length | |
# all_embeddings = all_embeddings / np.linalg.norm(all_embeddings, axis=1, keepdims=True) | |
# Perform agglomerative clustering | |
clustering_model = KMeans(n_clusters=num_clusters) | |
clustering_model.fit(all_embeddings) | |
cluster_assignment = clustering_model.labels_ | |
clustered_sentences = {} | |
for sentence_id, cluster_id in enumerate(cluster_assignment): | |
if cluster_id not in clustered_sentences: | |
clustered_sentences[cluster_id] = [] | |
clustered_sentences[cluster_id].append(all_sentences[sentence_id]) | |
return clustered_sentences | |
def generate_insights(topics:dict, name1:str, name2:str, text1:str, text2:str, clusters) -> list: | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
PROMPT = open("insights.prompt", "r").read() | |
# print(topics) | |
PROMPT = PROMPT.replace("{{name1}}", name1) | |
PROMPT = PROMPT.replace("{{name2}}", name2) | |
PROMPT = PROMPT.replace("{{topic1}}", ",".join(topics['insight1'][0])) | |
PROMPT = PROMPT.replace("{{topic2}}", ",".join(topics['insight2'][0])) | |
PROMPT = PROMPT.replace("{{complex1}}", ",".join(topics['insight1'][1])) | |
PROMPT = PROMPT.replace("{{complex2}}", ",".join(topics['insight2'][1])) | |
final_insights = [] | |
for cluster_id, sentences in clusters.items(): | |
# print(cluster_id, " ", sentences) | |
final_sentences = "\n".join(sentences)[:4000] | |
final_prompt = PROMPT.replace("{{sentences}}", final_sentences) | |
# with open(f"prompter/insights_{cluster_id}.prompt", "w") as f: | |
# f.write(final_prompt) | |
# Generate insights for each cluster | |
response = openai.Completion.create( | |
model="text-davinci-003", | |
prompt=final_prompt, | |
max_tokens=200, | |
temperature=0.7, | |
top_p=1, | |
frequency_penalty=0.0, | |
presence_penalty=0.0, | |
) | |
text = response['choices'][0]['text'] | |
jsonify = json.loads(text) | |
final_insights.append(jsonify) | |
return final_insights | |