joaopaulopresa's picture
Update app.py
52beb16 verified
import streamlit as st
import json
from data_module import faq_data, model_options
import uuid
from chat_handler import ChatHandler
chat = ChatHandler()
def add_custom_css():
st.markdown("""
<style>
.css-1d391kg { width: 35%; }
</style>
""", unsafe_allow_html=True)
def generate_user_id():
new_id = chat.generate_id()
return new_id
def clear_history(user_id):
chat.clear_history(user_id)
return 'response'
if 'user_id' not in st.session_state:
st.session_state['user_id'] = generate_user_id()
with open("embeddings_db_model.json", "r") as file:
embedding_models = json.load(file)
embedding_model_names = [model["model"] for model in embedding_models]
agent_types = [
'JSON_CHAT_MODEL',
'REACT_TEXT'
]
selected_model = st.sidebar.selectbox("Escolha o Modelo LLM", model_options)
selected_embedding_model = st.sidebar.selectbox("Escolha o Modelo de Embedding", embedding_model_names)
selected_embedding_dir = next(item for item in embedding_models if item["model"] == selected_embedding_model)["dir"]
selected_agent_type = st.sidebar.selectbox("Escolha o Tipo de Agent", agent_types)
add_custom_css()
with st.sidebar:
st.write("## Opções de Controle")
if st.button('Limpar Histórico'):
# Fazer a requisição para limpar o histórico
response = clear_history(st.session_state['user_id'])
if response:
st.session_state.messages = [{"role": "assistant", "content": "Histórico limpo. Pode começar uma nova conversa."}]
st.rerun()
else:
st.error("Erro ao limpar o histórico")
with st.container():
col1, col2 = st.columns([1, 1])
with col1:
st.caption("LLM:")
st.write(selected_model)
with col2:
st.caption("Embeddings:")
st.write(selected_embedding_model)
st.title("⚖️ ChatBot Direito Tributário")
st.caption("Direito Tributário da Pessoa Jurídica")
st.caption("Projeto do Workshop de LLM UFG")
if "messages" not in st.session_state:
st.session_state["messages"] = [{"role": "assistant", "content": "Olá como posso ajudar?"}]
if "faq_question" not in st.session_state:
st.session_state["faq_question"] = None
# Input de chat do usuário
for msg in st.session_state.messages:
if msg['role'] == 'assistant':
img = "server_icon.png"
else:
img = 'user_icon.png'
st.chat_message(msg["role"],avatar=img).write(msg["content"])
def process_question(question):
st.session_state.messages.append({"role": "user", "content": question})
st.chat_message("user", avatar="user_icon.png").write(question)
with st.chat_message("assistant", avatar="server_icon.png"):
with st.spinner("Thinking..."):
data = dict(
user_id=st.session_state['user_id'],
text= question,
embedding_model= selected_embedding_model,
embedding_dir= selected_embedding_dir,
model= selected_model,
agent_type=selected_agent_type
)
msg,intermediary_steps = chat.post_message(message=data)
st.write(str(msg))
st.session_state.messages.append({"role": "assistant", "content": msg})
# Adicionando os passos intermediários
#intermediary_steps = response['response']['intermediate_steps']
# intermediary_steps = []
if intermediary_steps:
with st.expander("Ver Passos Intermediários"):
if intermediary_steps[0] == 'erro':
st.markdown("## ERROR...\n")
else:
st.markdown("## > Entering new AgentExecutor chain...\n")
for index, step in enumerate(intermediary_steps, start=1):
# action = step[0].get('tool', 'Unknown')
action = step[0].tool if hasattr(step[0], 'tool') else 'Unknown'
# action_input = step[0].get('tool_input', 'N/A')
# log = step[0].get('log', 'No log available')
action_input = step[0].tool_input if hasattr(step[0], 'tool_input') else 'N/A'
log = step[0].log if hasattr(step[0], 'log') else 'No log available'
st.markdown(f"**Passo {index}:**")
st.markdown(f" **Ação:** `{action}`")
st.markdown(f" **Entrada da Ação:** `{action_input}`")
st.code(log, language='json')
st.markdown("---")
# Adiciona a ação "Final Answer" ao final dos passos
st.markdown("**Ação:** Final Answer")
st.markdown(f"**Entrada da Ação:** `{msg}`")
st.markdown("## > Finished chain.")
else:
with st.expander("Ver Passos Intermediários"):
st.markdown("#### > Entering new AgentExecutor chain...\n")
st.markdown("**Ação:** Final Answer")
st.markdown(f"**Entrada da Ação:** `{msg}`")
st.markdown("#### > Finished chain...")
def add_faq_question_to_chat(question):
st.session_state["faq_question"] = question
# Barra lateral com perguntas frequentes
with st.sidebar:
st.write("## Perguntas Frequentes")
for index, item in enumerate(faq_data, start=1):
question_with_number = f"{index}\. {item['question']}"
expander = st.expander(question_with_number, expanded=False)
with expander:
st.write(item["answer"])
button_key = f"button_{index}"
if st.button("Enviar esta pergunta", key=button_key):
st.session_state['selected_question'] = item["question"]
if 'selected_question' in st.session_state and st.session_state['selected_question']:
add_faq_question_to_chat(st.session_state['selected_question'])
del st.session_state['selected_question']
if st.session_state["faq_question"]:
process_question(st.session_state["faq_question"])
st.session_state["faq_question"] = None
if prompt := st.chat_input():
process_question(prompt)