Spaces:
Sleeping
Sleeping
import os | |
import streamlit as st | |
import fitz | |
from PIL import Image | |
import tempfile | |
from langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader | |
from langchain.chains.question_answering import load_qa_chain | |
from docx import Document | |
import io | |
# from langchain_community.llms import HuggingFaceHub | |
from langchain_huggingface import HuggingFaceEndpoint | |
# Ensure you have your Hugging Face token stored in an environment variable | |
huggingface_token = os.getenv('HUGGINGFACEHUB_API_TOKEN') | |
if huggingface_token is None: | |
raise ValueError("No Hugging Face token found. Please set the HUGGINGFACEHUB_API_TOKEN environment variable.") | |
llm = HuggingFaceEndpoint(repo_id="mistralai/Mistral-7B-Instruct-v0.3", huggingfacehub_api_token=huggingface_token) | |
# Initialize conversation history list | |
if "conversation_history" not in st.session_state: | |
st.session_state.conversation_history = [] | |
# Function to load document and perform question answering (cached) | |
def process_document(uploaded_file, query): | |
# Save uploaded file to temporary directory | |
with tempfile.NamedTemporaryFile(delete=False) as tmp_file: | |
tmp_file.write(uploaded_file.read()) | |
# Load document based on file type | |
file_extension = os.path.splitext(uploaded_file.name)[1].lower() | |
if file_extension == ".pdf": | |
loader = PyPDFLoader(tmp_file.name) | |
document_text = None | |
elif file_extension == ".docx": | |
loader = Docx2txtLoader(tmp_file.name) | |
document = Document(tmp_file.name) | |
document_text = "\n".join([paragraph.text for paragraph in document.paragraphs]) | |
else: | |
st.error("Unsupported file format. Please upload a text file (.txt), a PDF file (.pdf), or a Word document (.docx).") | |
return "", None | |
documents = loader.load() | |
# Load QA chain | |
# chain = load_qa_chain(llm=OpenAI(), verbose=True) | |
chain = load_qa_chain(llm=llm, verbose=True) | |
# Perform question answering | |
response = chain.invoke({"input_documents": documents, "question": query}) | |
# Remove temporary file | |
os.unlink(tmp_file.name) | |
return response["output_text"], document_text | |
# Function to update conversation history | |
def update_conversation(query, response): | |
st.session_state.conversation_history.append({"question": query, "answer": response}) | |
# Function to convert PDF pages to images | |
def pdf_to_images(pdf_bytes): | |
doc = fitz.open("pdf", pdf_bytes) | |
images = [] | |
for page_num in range(doc.page_count): | |
page = doc[page_num] | |
image = page.get_pixmap() | |
img = Image.frombytes("RGB", [image.width, image.height], image.samples) | |
images.append(img) | |
return images | |
# Streamlit UI | |
def main(): | |
# Set sidebar title | |
st.sidebar.title("7steps.AI") | |
st.sidebar.markdown("---") | |
# File uploader for document in sidebar | |
uploaded_file = st.sidebar.file_uploader("Upload a document", type=["pdf", "docx"]) | |
# Display document content or images | |
if uploaded_file is not None: | |
st.title("Document Content") | |
file_extension = os.path.splitext(uploaded_file.name)[1].lower() | |
if file_extension in [".docx"]: | |
_, document_text = process_document(uploaded_file, "") | |
if document_text is not None: | |
st.text_area("Document Text", value=document_text, height=300) | |
elif file_extension == ".pdf": | |
images = pdf_to_images(uploaded_file.getvalue()) | |
if images: | |
page_number = st.number_input("Page Number", value=1, min_value=1, max_value=len(images)) | |
st.image(images[page_number - 1], caption=f"Page {page_number}", use_column_width=True) | |
# Download button for images | |
img_bytes = io.BytesIO() | |
images[page_number - 1].save(img_bytes, format='PNG') | |
st.download_button("Download Image", img_bytes.getvalue(), f'Page_{page_number}.png') | |
# Text box for new question in sidebar | |
query = st.sidebar.text_input("Enter your question:") | |
# "Ask" button in sidebar | |
if st.sidebar.button("Ask"): | |
if uploaded_file is not None: | |
# Process document and display response | |
response, _ = process_document(uploaded_file, query) | |
if response: # Check if response is not empty | |
# Update conversation history | |
# st.write(response) | |
st.write("You:", query) | |
st.write("AI:", response) | |
update_conversation(query, response) | |
else: | |
st.sidebar.write("Please upload a document first.") | |
# # Display conversation history | |
# st.title("Conversation History") | |
# for item in st.session_state.conversation_history: | |
# st.write("You:", item["question"]) | |
# st.write("AI:", item["answer"]) | |
# Run the application | |
if __name__ == "__main__": | |
main() | |