|
import streamlit as st |
|
import spacy |
|
import numpy as np |
|
from gensim import corpora, models |
|
from itertools import chain |
|
from sklearn.preprocessing import MultiLabelBinarizer |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
from itertools import islice |
|
from scipy.signal import argrelmax |
|
|
|
nlp = spacy.load('en_core_web_sm') |
|
|
|
|
|
def window(seq, n=3): |
|
it = iter(seq) |
|
result = tuple(islice(it, n)) |
|
if len(result) == n: |
|
yield result |
|
for elem in it: |
|
result = result[1:] + (elem,) |
|
yield result |
|
|
|
def get_depths(scores): |
|
|
|
def climb(seq, i, mode='left'): |
|
|
|
if mode == 'left': |
|
while True: |
|
curr = seq[i] |
|
if i == 0: |
|
return curr |
|
i = i-1 |
|
if not seq[i] > curr: |
|
return curr |
|
|
|
if mode == 'right': |
|
while True: |
|
curr = seq[i] |
|
if i == (len(seq)-1): |
|
return curr |
|
i = i+1 |
|
if not seq[i] > curr: |
|
return curr |
|
|
|
depths = [] |
|
for i in range(len(scores)): |
|
score = scores[i] |
|
l_peak = climb(scores, i, mode='left') |
|
r_peak = climb(scores, i, mode='right') |
|
depth = 0.5 * (l_peak + r_peak - (2*score)) |
|
depths.append(depth) |
|
|
|
return np.array(depths) |
|
|
|
|
|
def get_local_maxima(depth_scores, order=1): |
|
maxima_ids = argrelmax(depth_scores, order=order)[0] |
|
filtered_scores = np.zeros(len(depth_scores)) |
|
filtered_scores[maxima_ids] = depth_scores[maxima_ids] |
|
return filtered_scores |
|
|
|
def compute_threshold(scores): |
|
s = scores[np.nonzero(scores)] |
|
threshold = np.mean(s) - (np.std(s) / 2) |
|
return threshold |
|
|
|
def get_threshold_segments(scores, threshold=0.1): |
|
segment_ids = np.where(scores >= threshold)[0] |
|
return segment_ids |
|
|
|
|
|
def print_list(lst): |
|
for e in lst: |
|
st.markdown("- " + e) |
|
|
|
|
|
st.subheader("Topic Modeling with Segmentation") |
|
uploaded_file = st.file_uploader("choose a text file", type=["txt"]) |
|
if uploaded_file is not None: |
|
st.session_state["text"] = uploaded_file.getvalue().decode('utf-8') |
|
|
|
st.write("OR") |
|
|
|
input_text = st.text_area( |
|
label="Enter text separated by newlines", |
|
value="", |
|
key="text", |
|
height=150 |
|
) |
|
|
|
button=st.button('Get Segments') |
|
if (button==True) and input_text != "": |
|
texts = input_text.split('\n') |
|
sents = [] |
|
for text in texts: |
|
doc = nlp(text) |
|
for sent in doc.sents: |
|
sents.append(sent) |
|
MIN_LENGTH = 3 |
|
tokenized_sents = [[token.lemma_.lower() for token in sent if |
|
not token.is_stop and not token.is_punct and token.text.strip() and len(token) >= MIN_LENGTH] |
|
for sent in sents] |
|
st.write("Modeling topics:") |
|
|
|
|
|
np.random.seed(123) |
|
|
|
N_TOPICS = 5 |
|
N_PASSES = 5 |
|
|
|
dictionary = corpora.Dictionary(tokenized_sents) |
|
bow = [dictionary.doc2bow(sent) for sent in tokenized_sents] |
|
topic_model = models.LdaModel(corpus=bow, id2word=dictionary, num_topics=N_TOPICS, passes=N_PASSES) |
|
st.write("inferring topics ...") |
|
THRESHOLD = 0.05 |
|
doc_topics = list(topic_model.get_document_topics(bow, minimum_probability=THRESHOLD)) |
|
k = 3 |
|
top_k_topics = [[t[0] for t in sorted(sent_topics, key=lambda x: x[1], reverse=True)][:k] |
|
for sent_topics in doc_topics] |
|
WINDOW_SIZE = 3 |
|
window_topics = window(top_k_topics, n=WINDOW_SIZE) |
|
window_topics = [list(set(chain.from_iterable(window))) for window in window_topics] |
|
|
|
binarizer = MultiLabelBinarizer(classes=range(N_TOPICS)) |
|
|
|
encoded_topic = binarizer.fit_transform(window_topics) |
|
st.write("generating segments ...") |
|
sims_topic = [cosine_similarity([pair[0]], [pair[1]])[0][0] for pair in zip(encoded_topic, encoded_topic[1:])] |
|
depths_topic = get_depths(sims_topic) |
|
filtered_topic = get_local_maxima(depths_topic, order=1) |
|
threshold_topic = compute_threshold(filtered_topic) |
|
threshold_segments_topic = get_threshold_segments(filtered_topic, threshold_topic) |
|
|
|
segment_ids = threshold_segments_topic + WINDOW_SIZE |
|
|
|
segment_ids = [0] + segment_ids.tolist() + [len(sents)] |
|
slices = list(zip(segment_ids[:-1], segment_ids[1:])) |
|
|
|
segmented = [sents[s[0]: s[1]] for s in slices] |
|
|
|
for segment in segmented[:-1]: |
|
print_list([s.text for s in segment]) |
|
st.markdown("""---""") |
|
|
|
print_list([s.text for s in segmented[-1]]) |