Spaces:
Sleeping
Sleeping
# Reference https://huggingface.co/spaces/johnmuchiri/anspro1/blob/main/app.py | |
# Resource https://python.langchain.com/docs/modules/chains | |
import streamlit as st | |
from langchain_community.document_loaders.pdf import PyPDFLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.vectorstores.pinecone import Pinecone | |
from langchain_openai import OpenAIEmbeddings, ChatOpenAI | |
from langchain.memory import ConversationBufferMemory | |
from langchain_core.prompts import ChatPromptTemplate | |
from langchain.chains import ConversationalRetrievalChain, RetrievalQAWithSourcesChain | |
import openai | |
from dotenv import load_dotenv | |
import os | |
import pinecone | |
load_dotenv() | |
# please create a streamlit app on huggingface that uses openai api | |
# and langchain data framework, the user should be able to upload | |
# a document and ask questions about the document, the app should | |
# respond with an answer and also display where the response is | |
# referenced from using some sort of visual annotation on the document | |
# set the path where you want to save the uploaded PDF file | |
SAVE_DIR = "pdf" | |
def generate_response(pages, query_text, k, chain_type): | |
if pages: | |
pinecone.init( | |
api_key=os.getenv("PINECONE_API_KEY"), | |
environment=os.getenv("PINECONE_ENV_NAME"), | |
) | |
vector_db = Pinecone.from_documents( | |
documents=pages, embedding=OpenAIEmbeddings(), index_name="document-chat" | |
) | |
retriever = vector_db.as_retriever( | |
search_type="similarity", search_kwards={"k": k} | |
) | |
prompt_template = ChatPromptTemplate.from_messages( | |
[ | |
( | |
"system", | |
"You are a helpful assistant that can answer questions regarding to a document provided by the user.", | |
), | |
("human", "Hello, how are you doing?"), | |
("ai", "I'm doing well, thanks!"), | |
("human", "{user_input}"), | |
] | |
) | |
llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0) | |
# create a chain to answer questions | |
qa = RetrievalQAWithSourcesChain.from_chain_type( | |
llm=llm, | |
chain_type=chain_type, | |
retriever=retriever, | |
return_source_documents=True, | |
# prompt_template=prompt_template, | |
) | |
response = qa({"question": query_text}) | |
return response | |
def visual_annotate(document, answer): | |
# Implement this function according to your specific requirements | |
# Highlight the part of the document where the answer was found | |
start = document.find(answer) | |
annotated_document = ( | |
document[:start] | |
+ "**" | |
+ document[start : start + len(answer)] | |
+ "**" | |
+ document[start + len(answer) :] | |
) | |
return annotated_document | |
st.set_page_config(page_title="π¦π Ask the Doc App") | |
st.title("Document Question Answering App") | |
with st.sidebar.form(key="sidebar-form"): | |
st.header("Configurations") | |
openai_api_key = st.text_input("Enter OpenAI API key here", type="password") | |
os.environ["OPENAI_API_KEY"] = openai_api_key | |
pinecone_api_key = st.text_input( | |
"Enter your Pinecone environment key", type="password" | |
) | |
os.environ["PINECONE_API_KEY"] = pinecone_api_key | |
pinecone_env_name = st.text_input("Enter your Pinecone environment name") | |
os.environ["PINECONE_ENV_NAME"] = pinecone_env_name | |
submitted = st.form_submit_button( | |
label="Submit", | |
# disabled=not (openai_api_key and pinecone_api_key and pinecone_env_name), | |
) | |
left_column, right_column = st.columns(2) | |
with left_column: | |
uploaded_file = st.file_uploader("Choose a pdf file", type="pdf") | |
pages = [] | |
if uploaded_file is not None: | |
# save the uploaded file to the specified directory | |
file_path = os.path.join(SAVE_DIR, uploaded_file.name) | |
with open(file_path, "wb") as f: | |
f.write(uploaded_file.getbuffer()) | |
st.success(f"File {uploaded_file.name} is saved at path {file_path}") | |
loader = PyPDFLoader(file_path=file_path) | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=0) | |
pages = loader.load_and_split(text_splitter=text_splitter) | |
query_text = st.text_input( | |
"Enter your question:", placeholder="Please provide a short summary." | |
) | |
chain_type = st.selectbox( | |
"chain type", ("stuff", "map_reduce", "refine", "map_rerank") | |
) | |
k = st.slider("Number of relevant chunks", 1, 5) | |
with st.spinner("Retrieving and generating a response ..."): | |
response = generate_response( | |
pages=pages, query_text=query_text, k=k, chain_type=chain_type | |
) | |
with right_column: | |
st.write("Output of your question") | |
if response: | |
st.subheader("Result") | |
st.write(response["answer"]) | |
print("response: ", response) | |
st.subheader("source_documents") | |
for each in response["source_documents"]: | |
st.write("page: ", each.metadata["page"]) | |
st.write("source: ", each.metadata["source"]) | |
else: | |
st.write("response not showing at the moment") | |
# with st.form("myform", clear_on_submit=True): | |
# openai_api_key = st.text_input( | |
# "OpenAI API Key", type="password", disabled=not (uploaded_file and query_text) | |
# ) | |
# submitted = st.form_submit_button( | |
# "Submit", disabled=not (pages and query_text) | |
# ) | |
# if submitted and openai_api_key.startswith("sk-"): | |
# with st.spinner("Calculating..."): | |
# response = generate_response(pages, openai_api_key, query_text) | |
# result.append(response) | |
# del openai_api_key | |
# if len(result): | |
# st.info(response) | |
# if st.button("Get Answer"): | |
# answer = get_answer(question, document) | |
# st.write(answer["answer"]) | |
# # Visual annotation on the document | |
# annotated_document = visual_annotate(document, answer["answer"]) | |
# st.markdown(annotated_document) | |