Spaces:
Running
Running
import streamlit as st | |
import requests | |
from datetime import datetime | |
import base64 | |
import os | |
import re | |
if "dialog_done" not in st.session_state: | |
st.session_state.dialog_done = False | |
if not st.session_state.dialog_done: | |
st.title("เริ่มต้นคำถาม") | |
st.write("คำถาม: ใครคือคนที่หน้าตาดีที่สุด?") | |
answer = st.text_input("กรุณาตอบคำถามนี้:") | |
if st.button("ส่งคำตอบ"): | |
if answer == "พี่ก้องคนหล่อ": | |
st.session_state.dialog_done = True | |
st.success("คำตอบถูกต้อง! กดปุ่ม 'เข้าสู่หน้าหลัก' เพื่อดำเนินการต่อ") | |
else: | |
st.error("คำตอบไม่ถูกต้อง กรุณาลองใหม่อีกครั้ง.") | |
if st.session_state.dialog_done: | |
if st.button("เข้าสู่หน้าหลัก"): | |
st.experimental_rerun() | |
else: | |
st.title("AI สนับสนุนความรู้ด้าน PDPA") | |
st.write("เราสอบถาม AI สืบค้น และสรุป") | |
system_prompt = "คุณเป็นผู้ช่วยที่มีความรู้ด้านกฎหมาย PDPA และสามารถให้คำตอบที่เกี่ยวข้องเฉพาะตาม context ที่ได้รับ" | |
def clean_text_for_search(text): | |
text = re.sub(r'P-\d+\s*$', '', text, flags=re.MULTILINE) | |
text = re.sub(r'Confidential.*$', '', text, flags=re.MULTILINE) | |
text = ' '.join(text.split()) | |
return text | |
def create_highlighted_pdf(pdf_path, search_text, page_number): | |
try: | |
search_text = clean_text_for_search(search_text) | |
doc = fitz.open(pdf_path) | |
page = doc[int(page_number) - 1] | |
words = [word for word in search_text.split() if word] | |
for word in words: | |
if len(word) > 3: | |
text_instances = page.search_for(word) | |
for inst in text_instances: | |
highlight = page.add_highlight_annot(inst) | |
highlight.set_colors(stroke=(1, 1, 0)) | |
highlight.update() | |
new_doc = fitz.open() | |
new_doc.insert_pdf(doc, from_page=int(page_number) - 1, to_page=int(page_number) - 1) | |
pdf_bytes = new_doc.write() | |
doc.close() | |
new_doc.close() | |
return pdf_bytes | |
except Exception as e: | |
st.error(f"Error in create_highlighted_pdf: {str(e)}") | |
return None | |
def format_file_size(size_in_bytes): | |
for unit in ['B', 'KB', 'MB', 'GB']: | |
if size_in_bytes < 1024: | |
return f"{size_in_bytes:.2f} {unit}" | |
size_in_bytes /= 1024 | |
return f"{size_in_bytes:.2f} GB" | |
def display_search_result(result, index): | |
with st.expander(f"🔍 Search Result #{index + 1} (Score: {result['score']:.4f})"): | |
st.markdown("#### 📄 Document Information") | |
col1, col2 = st.columns(2) | |
with col1: | |
st.markdown("**File Details:**") | |
st.write(f"• File Name: {result['metadata']['file_name']}") | |
st.write(f"• Page: {result['metadata']['page_label']}") | |
st.write(f"• Type: {result['metadata']['file_type']}") | |
st.write(f"• Size: {format_file_size(result['metadata']['file_size'])}") | |
with col2: | |
st.markdown("**Dates:**") | |
st.write(f"• Created: {result['metadata']['creation_date']}") | |
st.write(f"• Modified: {result['metadata']['last_modified_date']}") | |
st.markdown("#### 📝 Content") | |
st.markdown(f"```\n{result['text']}\n```") | |
try: | |
pdf_path = result['file_path'] | |
if os.path.exists(pdf_path): | |
st.markdown("#### 📄 PDF Preview (with highlighted text)") | |
highlighted_pdf = create_highlighted_pdf( | |
pdf_path, | |
result['text'], | |
result['metadata']['page_label'] | |
) | |
if highlighted_pdf: | |
base64_pdf = base64.b64encode(highlighted_pdf).decode('utf-8') | |
pdf_display = f''' | |
<iframe | |
src="data:application/pdf;base64,{base64_pdf}" | |
width="100%" | |
height="800px" | |
type="application/pdf" | |
style="border: 1px solid #ccc; border-radius: 5px;" | |
></iframe> | |
''' | |
st.markdown(pdf_display, unsafe_allow_html=True) | |
else: | |
st.error("Failed to create highlighted PDF") | |
except Exception as e: | |
st.error(f"Error displaying PDF: {str(e)}") | |
if "chat_history" not in st.session_state: | |
st.session_state.chat_history = [] | |
with st.form(key="input_form"): | |
user_input = st.text_input("You:", key="input") | |
submit_button = st.form_submit_button("Send") | |
if submit_button: | |
if user_input: | |
if st.session_state.chat_history: | |
st.session_state.chat_history.insert(0, ("###", "###")) | |
st.session_state.chat_history.insert(0, ("You", user_input)) | |
try: | |
response = requests.post("http://113.53.253.50:8002/search", json={"query": user_input}) | |
response.raise_for_status() | |
data = response.json() | |
search_results = data["results"] | |
st.markdown("### 🔎 Search Results") | |
for idx, result in enumerate(search_results): | |
display_search_result(result, idx) | |
response_text = "\n\n---\n\n".join([f"Text: {result['text']}" for result in search_results]) | |
except requests.RequestException as e: | |
st.error(f"Error: {str(e)}") | |