awacke1's picture
Update app.py
7e31ed4
raw
history blame
12.5 kB
import spacy
import wikipediaapi
import wikipedia
from wikipedia.exceptions import DisambiguationError
from transformers import TFAutoModel, AutoTokenizer
import numpy as np
import pandas as pd
import faiss
import gradio as gr
try:
nlp = spacy.load("en_core_web_sm")
except:
spacy.cli.download("en_core_web_sm")
nlp = spacy.load("en_core_web_sm")
wh_words = ['what', 'who', 'how', 'when', 'which']
def get_concepts(text):
text = text.lower()
doc = nlp(text)
concepts = []
for chunk in doc.noun_chunks:
if chunk.text not in wh_words:
concepts.append(chunk.text)
return concepts
def get_passages(text, k=100):
doc = nlp(text)
passages = []
passage_len = 0
passage = ""
sents = list(doc.sents)
for i in range(len(sents)):
sen = sents[i]
passage_len+=len(sen)
if passage_len >= k:
passages.append(passage)
passage = sen.text
passage_len = len(sen)
continue
elif i==(len(sents)-1):
passage+=" "+sen.text
passages.append(passage)
passage = ""
passage_len = 0
continue
passage+=" "+sen.text
return passages
def get_dicts_for_dpr(concepts, n_results=20, k=100):
dicts = []
for concept in concepts:
wikis = wikipedia.search(concept, results=n_results)
print(concept, "No of Wikis: ",len(wikis))
for wiki in wikis:
try:
html_page = wikipedia.page(title = wiki, auto_suggest = False)
except DisambiguationError:
continue
htmlResults=html_page.content
passages = get_passages(htmlResults, k=k)
for passage in passages:
i_dicts = {}
i_dicts['text'] = passage
i_dicts['title'] = wiki
dicts.append(i_dicts)
return dicts
passage_encoder = TFAutoModel.from_pretrained("nlpconnect/dpr-ctx_encoder_bert_uncased_L-2_H-128_A-2")
query_encoder = TFAutoModel.from_pretrained("nlpconnect/dpr-question_encoder_bert_uncased_L-2_H-128_A-2")
p_tokenizer = AutoTokenizer.from_pretrained("nlpconnect/dpr-ctx_encoder_bert_uncased_L-2_H-128_A-2")
q_tokenizer = AutoTokenizer.from_pretrained("nlpconnect/dpr-question_encoder_bert_uncased_L-2_H-128_A-2")
def get_title_text_combined(passage_dicts):
res = []
for p in passage_dicts:
res.append(tuple((p['title'], p['text'])))
return res
def extracted_passage_embeddings(processed_passages, max_length=156):
passage_inputs = p_tokenizer.batch_encode_plus(
processed_passages,
add_special_tokens=True,
truncation=True,
padding="max_length",
max_length=max_length,
return_token_type_ids=True
)
passage_embeddings = passage_encoder.predict([np.array(passage_inputs['input_ids']),
np.array(passage_inputs['attention_mask']),
np.array(passage_inputs['token_type_ids'])],
batch_size=64,
verbose=1)
return passage_embeddings
def extracted_query_embeddings(queries, max_length=64):
query_inputs = q_tokenizer.batch_encode_plus(
queries,
add_special_tokens=True,
truncation=True,
padding="max_length",
max_length=max_length,
return_token_type_ids=True
)
query_embeddings = query_encoder.predict([np.array(query_inputs['input_ids']),
np.array(query_inputs['attention_mask']),
np.array(query_inputs['token_type_ids'])],
batch_size=1,
verbose=1)
return query_embeddings
#Wikipedia API:
def get_pagetext(page):
s=str(page).replace("/t","")
return s
def get_wiki_summary(search):
wiki_wiki = wikipediaapi.Wikipedia('en')
page = wiki_wiki.page(search)
isExist = page.exists()
if not isExist:
return isExist, "Not found", "Not found", "Not found", "Not found"
pageurl = page.fullurl
pagetitle = page.title
pagesummary = page.summary[0:60]
pagetext = get_pagetext(page.text)
backlinks = page.backlinks
linklist = ""
for link in backlinks.items():
pui = link[0]
linklist += pui + " , "
a=1
categories = page.categories
categorylist = ""
for category in categories.items():
pui = category[0]
categorylist += pui + " , "
a=1
links = page.links
linklist2 = ""
for link in links.items():
pui = link[0]
linklist2 += pui + " , "
a=1
sections = page.sections
ex_dic = {
'Entity' : ["URL","Title","Summary", "Text", "Backlinks", "Links", "Categories"],
'Value': [pageurl, pagetitle, pagesummary, pagetext, linklist,linklist2, categorylist ]
}
#columns = [pageurl,pagetitle]
#index = [pagesummary,pagetext]
#df = pd.DataFrame(page, columns=columns, index=index)
#df = pd.DataFrame(ex_dic, columns=columns, index=index)
df = pd.DataFrame(ex_dic)
return df
def search(question):
concepts = get_concepts(question)
print("concepts: ",concepts)
dicts = get_dicts_for_dpr(concepts, n_results=1)
lendicts = len(dicts)
print("dicts len: ", lendicts)
if lendicts == 0:
return pd.DataFrame()
processed_passages = get_title_text_combined(dicts)
passage_embeddings = extracted_passage_embeddings(processed_passages)
query_embeddings = extracted_query_embeddings([question])
faiss_index = faiss.IndexFlatL2(128)
faiss_index.add(passage_embeddings.pooler_output)
# prob, index = faiss_index.search(query_embeddings.pooler_output, k=1000)
prob, index = faiss_index.search(query_embeddings.pooler_output, k=lendicts)
return pd.DataFrame([dicts[i] for i in index[0]])
# AI UI SOTA - gradio blocks with UI formatting, and event driven UI
with gr.Blocks() as demo: # Block documentation on event listeners, start here: https://gradio.app/blocks_and_event_listeners/
gr.Markdown("<h1><center>🍰 Ultimate Wikipedia AI 🎨</center></h1>")
gr.Markdown("""<div align="center">Search and Find Anything Then Use in AI! <a href="https://www.mediawiki.org/wiki/API:Main_page">MediaWiki - API for Wikipedia</a>. <a href="https://paperswithcode.com/datasets?q=wikipedia&v=lst&o=newest">Papers,Code,Datasets for SOTA w/ Wikipedia</a>""")
with gr.Row(): # inputs and buttons
inp = gr.Textbox(lines=1, default="Syd Mead", label="Question")
with gr.Row(): # inputs and buttons
b3 = gr.Button("Search AI Summaries")
b4 = gr.Button("Search Web Live")
with gr.Row(): # outputs DF1
out = gr.Dataframe(label="Answers", type="pandas")
with gr.Row(): # output DF2
out_DF = gr.Dataframe(wrap=True, max_rows=1000, overflow_row_behaviour= "paginate", datatype = ["markdown", "markdown"], headers=['Entity', 'Value'])
inp.submit(fn=get_wiki_summary, inputs=inp, outputs=out_DF)
b3.click(fn=search, inputs=inp, outputs=out)
b4.click(fn=get_wiki_summary, inputs=inp, outputs=out_DF )
demo.launch(debug=True, show_error=True)
UseMemory=True
HF_TOKEN=os.environ.get("HF_TOKEN")
def SaveResult(text, outputfileName):
basedir = os.path.dirname(__file__)
savePath = outputfileName
print("Saving: " + text + " to " + savePath)
from os.path import exists
file_exists = exists(savePath)
if file_exists:
with open(outputfileName, "a") as f: #append
f.write(str(text.replace("\n"," ")))
f.write('\n')
else:
with open(outputfileName, "w") as f: #write
f.write(str("time, message, text\n")) # one time only to get column headers for CSV file
f.write(str(text.replace("\n"," ")))
f.write('\n')
return
def store_message(name: str, message: str, outputfileName: str):
basedir = os.path.dirname(__file__)
savePath = outputfileName
# if file doesnt exist, create it with labels
from os.path import exists
file_exists = exists(savePath)
if (file_exists==False):
with open(savePath, "w") as f: #write
f.write(str("time, message, text\n")) # one time only to get column headers for CSV file
if name and message:
writer = csv.DictWriter(f, fieldnames=["time", "message", "name"])
writer.writerow(
{"time": str(datetime.now()), "message": message.strip(), "name": name.strip() }
)
df = pd.read_csv(savePath)
df = df.sort_values(df.columns[0],ascending=False)
else:
if name and message:
with open(savePath, "a") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=[ "time", "message", "name", ])
writer.writerow(
{"time": str(datetime.now()), "message": message.strip(), "name": name.strip() }
)
df = pd.read_csv(savePath)
df = df.sort_values(df.columns[0],ascending=False)
return df
mname = "facebook/blenderbot-400M-distill"
model = BlenderbotForConditionalGeneration.from_pretrained(mname)
tokenizer = BlenderbotTokenizer.from_pretrained(mname)
def take_last_tokens(inputs, note_history, history):
if inputs['input_ids'].shape[1] > 128:
inputs['input_ids'] = torch.tensor([inputs['input_ids'][0][-128:].tolist()])
inputs['attention_mask'] = torch.tensor([inputs['attention_mask'][0][-128:].tolist()])
note_history = ['</s> <s>'.join(note_history[0].split('</s> <s>')[2:])]
history = history[1:]
return inputs, note_history, history
def add_note_to_history(note, note_history):# good example of non async since we wait around til we know it went okay.
note_history.append(note)
note_history = '</s> <s>'.join(note_history)
return [note_history]
title = "💬ChatBack🧠💾"
description = """Chatbot With persistent memory dataset allowing multiagent system AI to access a shared dataset as memory pool with stored interactions.
Current Best SOTA Chatbot: https://huggingface.co/facebook/blenderbot-400M-distill?text=Hey+my+name+is+ChatBack%21+Are+you+ready+to+rock%3F """
def get_base(filename):
basedir = os.path.dirname(__file__)
print(basedir)
#loadPath = basedir + "\\" + filename # works on windows
loadPath = basedir + filename
print(loadPath)
return loadPath
def chat(message, history):
history = history or []
if history:
history_useful = ['</s> <s>'.join([str(a[0])+'</s> <s>'+str(a[1]) for a in history])]
else:
history_useful = []
history_useful = add_note_to_history(message, history_useful)
inputs = tokenizer(history_useful, return_tensors="pt")
inputs, history_useful, history = take_last_tokens(inputs, history_useful, history)
reply_ids = model.generate(**inputs)
response = tokenizer.batch_decode(reply_ids, skip_special_tokens=True)[0]
history_useful = add_note_to_history(response, history_useful)
list_history = history_useful[0].split('</s> <s>')
history.append((list_history[-2], list_history[-1]))
df=pd.DataFrame()
if UseMemory:
#outputfileName = 'ChatbotMemory.csv'
outputfileName = 'ChatbotMemory2.csv' # Test first time file create
df = store_message(message, response, outputfileName) # Save to dataset
basedir = get_base(outputfileName)
return history, df, basedir
with gr.Blocks() as demo:
gr.Markdown("<h1><center>🍰Gradio chatbot backed by dataframe CSV memory🎨</center></h1>")
with gr.Row():
t1 = gr.Textbox(lines=1, default="", label="Chat Text:")
b1 = gr.Button("Respond and Retrieve Messages")
with gr.Row(): # inputs and buttons
s1 = gr.State([])
df1 = gr.Dataframe(wrap=True, max_rows=1000, overflow_row_behaviour= "paginate")
with gr.Row(): # inputs and buttons
file = gr.File(label="File")
s2 = gr.Markdown()
b1.click(fn=chat, inputs=[t1, s1], outputs=[s1, df1, file])
demo.launch(debug=True, show_error=True)