Spaces:
Runtime error
Runtime error
import streamlit as st | |
import json | |
from data_module import faq_data, model_options | |
import uuid | |
from chat_handler import ChatHandler | |
chat = ChatHandler() | |
def add_custom_css(): | |
st.markdown(""" | |
<style> | |
.css-1d391kg { width: 35%; } | |
</style> | |
""", unsafe_allow_html=True) | |
def generate_user_id(): | |
new_id = chat.generate_id() | |
return new_id | |
def clear_history(user_id): | |
chat.clear_history(user_id) | |
return 'response' | |
if 'user_id' not in st.session_state: | |
st.session_state['user_id'] = generate_user_id() | |
with open("embeddings_db_model.json", "r") as file: | |
embedding_models = json.load(file) | |
embedding_model_names = [model["model"] for model in embedding_models] | |
agent_types = [ | |
'JSON_CHAT_MODEL', | |
'REACT_TEXT' | |
] | |
selected_model = st.sidebar.selectbox("Escolha o Modelo LLM", model_options) | |
selected_embedding_model = st.sidebar.selectbox("Escolha o Modelo de Embedding", embedding_model_names) | |
selected_embedding_dir = next(item for item in embedding_models if item["model"] == selected_embedding_model)["dir"] | |
selected_agent_type = st.sidebar.selectbox("Escolha o Tipo de Agent", agent_types) | |
add_custom_css() | |
with st.sidebar: | |
st.write("## Opções de Controle") | |
if st.button('Limpar Histórico'): | |
# Fazer a requisição para limpar o histórico | |
response = clear_history(st.session_state['user_id']) | |
if response: | |
st.session_state.messages = [{"role": "assistant", "content": "Histórico limpo. Pode começar uma nova conversa."}] | |
st.rerun() | |
else: | |
st.error("Erro ao limpar o histórico") | |
with st.container(): | |
col1, col2 = st.columns([1, 1]) | |
with col1: | |
st.caption("LLM:") | |
st.write(selected_model) | |
with col2: | |
st.caption("Embeddings:") | |
st.write(selected_embedding_model) | |
st.title("⚖️ ChatBot Direito Tributário") | |
st.caption("Direito Tributário da Pessoa Jurídica") | |
st.caption("Projeto do Workshop de LLM UFG") | |
if "messages" not in st.session_state: | |
st.session_state["messages"] = [{"role": "assistant", "content": "Olá como posso ajudar?"}] | |
if "faq_question" not in st.session_state: | |
st.session_state["faq_question"] = None | |
# Input de chat do usuário | |
for msg in st.session_state.messages: | |
if msg['role'] == 'assistant': | |
img = "server_icon.png" | |
else: | |
img = 'user_icon.png' | |
st.chat_message(msg["role"],avatar=img).write(msg["content"]) | |
def process_question(question): | |
st.session_state.messages.append({"role": "user", "content": question}) | |
st.chat_message("user", avatar="user_icon.png").write(question) | |
with st.chat_message("assistant", avatar="server_icon.png"): | |
with st.spinner("Thinking..."): | |
data = dict( | |
user_id=st.session_state['user_id'], | |
text= question, | |
embedding_model= selected_embedding_model, | |
embedding_dir= selected_embedding_dir, | |
model= selected_model, | |
agent_type=selected_agent_type | |
) | |
msg,intermediary_steps = chat.post_message(message=data) | |
st.write(str(msg)) | |
st.session_state.messages.append({"role": "assistant", "content": msg}) | |
# Adicionando os passos intermediários | |
#intermediary_steps = response['response']['intermediate_steps'] | |
# intermediary_steps = [] | |
if intermediary_steps: | |
with st.expander("Ver Passos Intermediários"): | |
if intermediary_steps[0] == 'erro': | |
st.markdown("## ERROR...\n") | |
else: | |
st.markdown("## > Entering new AgentExecutor chain...\n") | |
for index, step in enumerate(intermediary_steps, start=1): | |
# action = step[0].get('tool', 'Unknown') | |
action = step[0].tool if hasattr(step[0], 'tool') else 'Unknown' | |
# action_input = step[0].get('tool_input', 'N/A') | |
# log = step[0].get('log', 'No log available') | |
action_input = step[0].tool_input if hasattr(step[0], 'tool_input') else 'N/A' | |
log = step[0].log if hasattr(step[0], 'log') else 'No log available' | |
st.markdown(f"**Passo {index}:**") | |
st.markdown(f" **Ação:** `{action}`") | |
st.markdown(f" **Entrada da Ação:** `{action_input}`") | |
st.code(log, language='json') | |
st.markdown("---") | |
# Adiciona a ação "Final Answer" ao final dos passos | |
st.markdown("**Ação:** Final Answer") | |
st.markdown(f"**Entrada da Ação:** `{msg}`") | |
st.markdown("## > Finished chain.") | |
else: | |
with st.expander("Ver Passos Intermediários"): | |
st.markdown("#### > Entering new AgentExecutor chain...\n") | |
st.markdown("**Ação:** Final Answer") | |
st.markdown(f"**Entrada da Ação:** `{msg}`") | |
st.markdown("#### > Finished chain...") | |
def add_faq_question_to_chat(question): | |
st.session_state["faq_question"] = question | |
# Barra lateral com perguntas frequentes | |
with st.sidebar: | |
st.write("## Perguntas Frequentes") | |
for index, item in enumerate(faq_data, start=1): | |
question_with_number = f"{index}\. {item['question']}" | |
expander = st.expander(question_with_number, expanded=False) | |
with expander: | |
st.write(item["answer"]) | |
button_key = f"button_{index}" | |
if st.button("Enviar esta pergunta", key=button_key): | |
st.session_state['selected_question'] = item["question"] | |
if 'selected_question' in st.session_state and st.session_state['selected_question']: | |
add_faq_question_to_chat(st.session_state['selected_question']) | |
del st.session_state['selected_question'] | |
if st.session_state["faq_question"]: | |
process_question(st.session_state["faq_question"]) | |
st.session_state["faq_question"] = None | |
if prompt := st.chat_input(): | |
process_question(prompt) | |