TopicDig / streamlit_app.py
m. polinsky
Update streamlit_app.py
aabe8a0 unverified
raw
history blame
8.66 kB
# streamlit_app.py manages the whole TopicDig process
from typing import List, Set
from collections import namedtuple
import random
import requests
import json
from codetiming import Timer
import streamlit as st
from digestor import Digestor
from source import Source
from scrape_sources import NPRLite, CNNText, stub
def initialize(limit, rando, use_cache=True):
clusters: dict[str:List[namedtuple]] = dict()
# This is a container for the source classes.
# Make sure you handle this. Whats the deal.
sources:List[Source]= [] # Write them and import? Read a config?
# FOR NOW ONLY add this explicitly here.
# MUST read in final version though.
sources.append(NPRLite(
'npr',
'https://text.npr.org/1001',
'sshleifer/distilbart-cnn-12-6',
'dbmdz/bert-large-cased-finetuned-conll03-english'
))
sources.append(CNNText(
'cnn',
'https://lite.cnn.com',
'sshleifer/distilbart-cnn-12-6',
'dbmdz/bert-large-cased-finetuned-conll03-english'
))
# initialize list to hold cluster data namedtuples
cluster_data: List[namedtuple('article', ['link','hed','entities', 'source'])]
article_dict : dict[str:namedtuple]
# For all sources retrieve_cluster_data
# returns List[namedtuples] with empty entity lists
cluster_data = []
article_meta = namedtuple('article_meta',['source', 'count'])
cluster_meta : List[article_meta] = []
for data_source in sources:
if limit is not None:
c_data, c_meta = data_source.retrieve_cluster_data(limit//len(sources))
else:
c_data, c_meta = data_source.retrieve_cluster_data()
cluster_data.append(c_data)
cluster_meta.append(article_meta(data_source.source_name, c_meta))
st.session_state[data_source.source_name] = f"Number of clusters from source: {data_source.source_name}\n\t{len(c_data)}"
print("Finished...moving on to clustering...")
cluster_data = cluster_data[0] + cluster_data[1]
# NER
# iterate the list of namedtuples,
for tup in cluster_data:
# pass each hed to the api query method, return the dict
# through the ner_results function to the 'entities' list.
# Populate stub entities list
perform_ner(tup, cache=use_cache)
generate_clusters(clusters, tup)
st.session_state['num_clusters'] = f"""Total number of clusters: {len(clusters)}"""
# Article stubs tracks all stubs
# If cluster is unsummarized, its hed's value is the namedtuple stub.
# Else reference digestor instance so summary can be found.
article_dict = {stub.hed: stub for stub in cluster_data}
return article_dict, clusters
# Am I going to use this for those two lines?
def perform_ner(tup:namedtuple('article',['link','hed','entities', 'source']), cache=True):
with Timer(name="ner_query_time", logger=None):
result = ner_results(ner_query(
{
"inputs":tup.hed,
"paramters":
{
"use_cache": cache,
},
}
))
for i in result:
tup.entities.append(i)
@st.cache()
def ner_query(payload):
print("making a query....")
data = json.dumps(payload)
response = requests.request("POST", NER_API_URL, headers=headers, data=data)
return json.loads(response.content.decode("utf-8"))
def generate_clusters(
the_dict: dict,
tup : namedtuple('article_stub',[ 'link','hed','entities', 'source'])
) -> dict:
for entity in tup.entities:
# Add cluster if entity not already in dict
if entity not in the_dict:
the_dict[entity] = []
# Add this article's link to the cluster dict
the_dict[entity].append(tup)
def ner_results(ner_object, groups=True, NER_THRESHOLD=0.5) -> List[str]:
# empty lists to collect our entities
people, places, orgs, misc = [], [], [], []
# 'ent' and 'designation' handle the difference between dictionary keys
# for aggregation strategy grouped vs ungrouped
ent = 'entity' if not groups else 'entity_group'
designation = 'I-' if not groups else ''
# Define actions -- this is a switch-case dictionary.
# keys are the identifiers used inthe return dict from
# the ner_query.
# values are list.append() for each of the lists
# created at the top of the function. They hold sorted entities.
# actions is used to pass entities into the lists.
# Why I called it actions I have no idea rename it.
actions = {designation+'PER':people.append,
designation+'LOC':places.append,
designation+'ORG':orgs.append,
designation+'MISC':misc.append
} # Is this an antipattern?
# For each dictionary in the ner result list, if the entity str doesn't contain a '#'
# and the confidence is > 90%, add the entity to the list for its type.
# actions[d[ent]](d['word']) accesses the key of actions that is returned
# from d[ent] and then passes the entity name, returned by d['word'] to
# the 'list.append' waiting to be called in the dict actions.
# Note the (). We access actions to call its append...
readable = [ actions[d[ent]](d['word']) for d in ner_object if '#' not in d['word'] and d['score'] > NER_THRESHOLD ]
# create list of all entities to return
ner_list = [i for i in set(people) if len(i) > 2] + [i for i in set(places) if len(i) > 2] + [i for i in set(orgs) if len(i) > 2] + [i for i in set(misc) if len(i) > 2]
return ner_list
# These could be passed through the command line
# or read from a config file.
# One of these is needed here for NER and one in Digestor for summarization.
NER_API_URL = "https://api-inference.huggingface.co/models/dbmdz/bert-large-cased-finetuned-conll03-english"
headers = {"Authorization": f"""Bearer {st.secrets['ato']}"""}
LIMIT = 20 # Controls time and number of clusters.
USE_CACHE = True
if not USE_CACHE:
print("NOT USING CACHE--ARE YOU GATHERING DATA?")
if LIMIT is not None:
print(f"LIMIT: {LIMIT}")
# digest store
digests = dict() # key is cluster, value is digestor object
out_dicts = []
# list to accept user choices
# retrieve cluster data and create dict to track each article (articleStubs)
# and create topic clusters by performing ner.
print("Initializing....")
article_dict, clusters = initialize(LIMIT, USE_CACHE)
# We now have clusters and cluster data. Redundancy.
# We call a display function and get the user input.
# For this its still streamlit.
# button to refresh topics
if st.button("Refresh topics!"):
article_dict, clusters = initialize(LIMIT, USE_CACHE)
selections = []
choices = list(clusters.keys())
choices.insert(0,'None')
st.write(st.session_state['cnn'])
st.write(st.session_state['npr'])
st.write(st.session_state['num_clusters'])
# Form used to take 3 menu inputs
with st.form(key='columns_in_form'):
cols = st.columns(3)
for i, col in enumerate(cols):
selections.append(col.selectbox(f'Make a Selection', choices, key=i))
submitted = st.form_submit_button('Submit')
if submitted:
selections = [i for i in selections if i is not None]
with st.spinner(text="Digesting...please wait, this will take a few moments...Maybe check some messages or start reading the latest papers on summarization with transformers...."):
chosen = []
for i in selections: # i is supposed to be a list of stubs, mostly one
if i != 'None':
for j in clusters[i]:
if j not in chosen:
chosen.append(j) # j is a stub.
# Digestor uses 'chosen' to create digest.
# 'user_choicese' is passed for reference.
digestor = Digestor(timer=Timer(), cache = USE_CACHE, stubs=chosen, user_choices=list(selections))
# happens internally but may be used differently so it isn't automatic upon digestor creation.
# Easily turn caching off for testing.
digestor.digest() # creates summaries and stores them associated with the digest
# Get displayable digest and digest data
digestor.build_digest()
if len(digestor.text) == 0:
st.write("You didn't select a topic!")
else:
st.write("Your digest is ready:\n")
st.write(digestor.text)
"st.session_state object:", st.session_state