Spaces:
Build error
Build error
import os | |
import streamlit as st | |
import pickle | |
import time | |
from langchain.chains import RetrievalQAWithSourcesChain | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.document_loaders import UnstructuredURLLoader | |
import google.generativeai as palm | |
from langchain.embeddings import GooglePalmEmbeddings | |
from langchain.llms import GooglePalm | |
from langchain.vectorstores import FAISS | |
from dotenv import load_dotenv | |
load_dotenv() # take environment variables from .env (especially openai api key) | |
st.title("RockyBot: News Research Tool π") | |
st.sidebar.title("News Article URLs") | |
urls = [] | |
for i in range(3): | |
url = st.sidebar.text_input(f"URL {i+1}") | |
urls.append(url) | |
process_url_clicked = st.sidebar.button("Process URLs") | |
file_path = "faiss_store_openai.pkl" | |
main_placeholder = st.empty() | |
llm = GooglePalm() | |
if process_url_clicked: | |
# load data | |
loader = UnstructuredURLLoader(urls=urls) | |
main_placeholder.text("Data Loading...Started...β β β ") | |
data = loader.load() | |
# split data | |
text_splitter = RecursiveCharacterTextSplitter( | |
separators=['\n\n', '\n', '.', ','], | |
chunk_size=1000 | |
) | |
main_placeholder.text("Text Splitter...Started...β β β ") | |
docs = text_splitter.split_documents(data) | |
# create embeddings and save it to FAISS index | |
embeddings = GooglePalmEmbeddings() | |
vectorstore_openai = FAISS.from_documents(docs, embeddings) | |
main_placeholder.text("Embedding Vector Started Building...β β β ") | |
time.sleep(2) | |
# Save the FAISS index to a pickle file | |
with open(file_path, "wb") as f: | |
pickle.dumps(vectorstore_openai, f) | |
query = main_placeholder.text_input("Question: ") | |
if query: | |
if os.path.exists(file_path): | |
with open(file_path, "rb") as f: | |
vectorstore = pickle.load(f) | |
chain = RetrievalQAWithSourcesChain.from_llm(llm=llm, retriever=vectorstore.as_retriever()) | |
result = chain({"question": query}, return_only_outputs=True) | |
# result will be a dictionary of this format --> {"answer": "", "sources": [] } | |
st.header("Answer") | |
st.write(result["answer"]) | |
# Display sources, if available | |
sources = result.get("sources", "") | |
if sources: | |
st.subheader("Sources:") | |
sources_list = sources.split("\n") # Split the sources by newline | |
for source in sources_list: | |
st.write(source) | |