import os import streamlit as st import time try: from src.models import get_model_names from src.open_strawberry import get_defaults, manage_conversation except (ModuleNotFoundError, ImportError): from models import get_model_names from open_strawberry import get_defaults, manage_conversation (model, system_prompt, initial_prompt, expected_answer, next_prompts, num_turns, show_next, final_prompt, temperature, max_tokens, num_turns_final_mod, show_cot, verbose) = get_defaults() st.title("Open Strawberry Conversation") st.markdown("[Open Strawberry GitHub Repo](https://github.com/pseudotensor/open-strawberry)") # Initialize session state if "messages" not in st.session_state: st.session_state.messages = [] if "turn_count" not in st.session_state: st.session_state.turn_count = 0 if "input_key" not in st.session_state: st.session_state.input_key = 0 if "conversation_started" not in st.session_state: st.session_state.conversation_started = False if "waiting_for_continue" not in st.session_state: st.session_state.waiting_for_continue = False if "generator" not in st.session_state: st.session_state.generator = None # Store the generator in session state if "prompt" not in st.session_state: st.session_state.prompt = None # Store the prompt in session state if "answer" not in st.session_state: st.session_state.answer = None if "system_prompt" not in st.session_state: st.session_state.system_prompt = None if "output_tokens" not in st.session_state: st.session_state.output_tokens = 0 if "input_tokens" not in st.session_state: st.session_state.input_tokens = 0 if "cache_creation_input_tokens" not in st.session_state: st.session_state.cache_creation_input_tokens = 0 if "cache_read_input_tokens" not in st.session_state: st.session_state.cache_read_input_tokens = 0 if "verbose" not in st.session_state: st.session_state.verbose = verbose if "max_tokens" not in st.session_state: st.session_state.max_tokens = max_tokens if "seed" not in st.session_state: st.session_state.seed = 0 if "temperature" not in st.session_state: st.session_state.temperature = temperature if "next_prompts" not in st.session_state: st.session_state.next_prompts = next_prompts if "final_prompt" not in st.session_state: st.session_state.final_prompt = final_prompt # Function to display chat messages def display_chat(): display_step = 1 for message in st.session_state.messages: if message["role"] == "assistant": if 'final' in message and message['final']: display_final(message) elif 'turn_title' in message and message['turn_title']: display_turn_title(message, display_step=display_step) display_step += 1 else: with st.expander("Chain of Thoughts", expanded=st.session_state["show_cot"]): assistant_container1 = st.chat_message("assistant") with assistant_container1.container(): st.markdown(message["content"].replace('\n', ' \n'), unsafe_allow_html=True) elif message["role"] == "user": if not message["initial"] and not st.session_state.show_next: continue user_container1 = st.chat_message("user") with user_container1: st.markdown(message["content"].replace('\n', ' \n'), unsafe_allow_html=True) def display_final(chunk1, can_rerun=False): if 'final' in chunk1 and chunk1['final']: if st.session_state.answer: if st.session_state.answer.strip() in chunk1["content"]: st.markdown(f'

🏆 Final Answer

', unsafe_allow_html=True) else: st.markdown(f'Expected: **{st.session_state.answer.strip()}**', unsafe_allow_html=True) st.markdown(f'

👎 Final Answer

', unsafe_allow_html=True) else: st.markdown(f'

👌 Final Answer

', unsafe_allow_html=True) final = chunk1["content"].strip().replace('\n', ' \n') if '\n' in final or '
' in final: st.markdown(f'{final}', unsafe_allow_html=True) else: st.markdown(f'**{final}**', unsafe_allow_html=True) if can_rerun: # rerun to get token stats st.rerun() def display_turn_title(chunk1, display_step=None): if display_step is None: display_step = st.session_state.turn_count name = "Completed Step" else: name = "Step" if 'turn_title' in chunk1 and chunk1['turn_title']: turn_title = chunk1["content"].strip().replace('\n', ' \n') step_time = f' in time {str(int(chunk1["thinking_time"]))}s' acum_time = f' in total {str(int(chunk1["total_thinking_time"]))}s' st.markdown(f'**{name} {display_step}: {turn_title}{step_time}{acum_time}**', unsafe_allow_html=True) if st.button("Start Reasoning Engine", disabled=st.session_state.conversation_started): st.session_state.conversation_started = True # Sidebar st.sidebar.title("Controls") on_hf_spaces = os.getenv("HF_SPACES", '0') == '1' def save_env_vars(env_vars): assert not on_hf_spaces, "Cannot save env vars in HF Spaces" env_path = os.path.join(os.path.dirname(__file__), "..", ".env") from dotenv import set_key for key, value in env_vars.items(): set_key(env_path, key, value) def get_dotenv_values(): if on_hf_spaces: return st.session_state.secrets else: from dotenv import dotenv_values return dotenv_values(os.path.join(os.path.dirname(__file__), "..", ".env")) if 'secrets' not in st.session_state: if on_hf_spaces: # allow user to enter st.session_state.secrets = dict(OPENAI_API_KEY='', OPENAI_BASE_URL='https://api.openai.com/v1', OPENAI_MODEL_NAME='', # OLLAMA_OPENAI_API_KEY='', # OLLAMA_OPENAI_BASE_URL='http://localhost:11434/v1/', # OLLAMA_OPENAI_MODEL_NAME='', # AZURE_OPENAI_API_KEY='', # AZURE_OPENAI_API_VERSION='', # AZURE_OPENAI_ENDPOINT='', # AZURE_OPENAI_DEPLOYMENT='', # AZURE_OPENAI_MODEL_NAME='', GEMINI_API_KEY='', # MISTRAL_API_KEY='', GROQ_API_KEY='', CEREBRAS_OPENAI_API_KEY='', ANTHROPIC_API_KEY='', ) else: st.session_state.secrets = {} def update_model_selection(): visible_models1 = get_model_names(st.session_state.secrets, on_hf_spaces) if visible_models1 and "model_name" in st.session_state: if st.session_state.model_name not in visible_models1: st.session_state.model_name = visible_models1[0] # Replace the existing model selection code with this if 'model_name' not in st.session_state or not st.session_state.model_name: update_model_selection() # Model selection visible_models = get_model_names(st.session_state.secrets, on_hf_spaces) st.sidebar.selectbox("Select Model", visible_models, key="model_name", disabled=st.session_state.conversation_started) st.sidebar.checkbox("Show Next", value=show_next, key="show_next", disabled=st.session_state.conversation_started) st.sidebar.number_input("Num Turns to Check if Final Answer", value=num_turns_final_mod, key="num_turns_final_mod", disabled=st.session_state.conversation_started) st.sidebar.number_input("Num Turns per User Click of Continue", value=num_turns, key="num_turns", disabled=st.session_state.conversation_started) st.sidebar.checkbox("Show Chain of Thoughts Details", value=show_cot, key="show_cot", disabled=st.session_state.conversation_started) # Reset conversation button reset_clicked = st.sidebar.button("Reset Conversation") with st.sidebar.expander("Edit in-memory session secrets" if on_hf_spaces else "Edit .env", expanded=on_hf_spaces): dotenv_dict = get_dotenv_values() new_env = {} for k, v in dotenv_dict.items(): new_env[k] = st.text_input(k, value=v, key=k, disabled=st.session_state.conversation_started, type="password") st.session_state.secrets[k] = new_env[k] save_secrets_clicked = st.button("Save dotenv" if not on_hf_spaces else "Save secrets to memory") if save_secrets_clicked: if on_hf_spaces: st.success("secrets temporarily stored to your session memory only") else: save_env_vars(st.session_state.user_secrets) st.success("dotenv saved to .env file") if reset_clicked: st.session_state.messages = [] st.session_state.turn_count = 0 st.sidebar.write(f"Turn count: {st.session_state.turn_count}") st.session_state.input_key += 1 st.session_state.conversation_started = False st.session_state.generator = None # Reset the generator reset_clicked = False st.session_state.output_tokens = 0 st.session_state.input_tokens = 0 st.session_state.cache_creation_input_tokens = 0 st.session_state.cache_read_input_tokens = 0 st.rerun() st.session_state.waiting_for_continue = False # Display debug information st.sidebar.write(f"Turn count: {st.session_state.turn_count}") num_messages = len([x for x in st.session_state.messages if x.get('role', '') == 'assistant']) st.sidebar.write(f"Number of AI messages: {num_messages}") st.sidebar.write(f"Conversation started: {st.session_state.conversation_started}") st.sidebar.write(f"Output tokens: {st.session_state.output_tokens}") st.sidebar.write(f"Input tokens: {st.session_state.input_tokens}") st.sidebar.write(f"Cache creation input tokens: {st.session_state.cache_creation_input_tokens}") st.sidebar.write(f"Cache read input tokens: {st.session_state.cache_read_input_tokens}") # Handle user input if not st.session_state.conversation_started: prompt = st.text_area("What would you like to ask?", value=initial_prompt, key=f"input_{st.session_state.input_key}", height=500) st.session_state.prompt = prompt answer = st.text_area("Expected answer (Empty if do not know)", value=expected_answer, key=f"answer_{st.session_state.input_key}", height=100) st.session_state.answer = answer system_prompt = st.text_area("Base System Prompt", value=system_prompt, key=f"system_prompt_{st.session_state.input_key}", height=200) st.session_state.system_prompt = system_prompt else: st.session_state.conversation_started = True st.session_state.input_key += 1 # Display chat history chat_container = st.container() with chat_container: display_chat() # Process conversation current_assistant_message = '' assistant_placeholder = None try: while True: if st.session_state.waiting_for_continue: time.sleep(0.1) # Short sleep to prevent excessive CPU usage continue if not st.session_state.conversation_started: time.sleep(0.1) continue elif st.session_state.generator is None: st.session_state.generator = manage_conversation( model=st.session_state["model_name"], system=st.session_state.system_prompt, initial_prompt=st.session_state.prompt, next_prompts=st.session_state.next_prompts, final_prompt=st.session_state.final_prompt, num_turns_final_mod=st.session_state.num_turns_final_mod, num_turns=st.session_state.num_turns, temperature=st.session_state.temperature, max_tokens=st.session_state.max_tokens, seed=st.session_state.seed, secrets=st.session_state.secrets, verbose=st.session_state.verbose, ) chunk = next(st.session_state.generator) if chunk["role"] == "assistant": if not chunk.get('final', False) and not chunk.get('turn_title', False): current_assistant_message += chunk["content"] if assistant_placeholder is None: assistant_placeholder = st.empty() # Placeholder for assistant's message # Update the assistant container with the progressively streaming message with assistant_placeholder.container(): # Update in the same chat message with st.expander("Chain of Thoughts", expanded=st.session_state["show_cot"]): st.chat_message("assistant").markdown(current_assistant_message, unsafe_allow_html=True) if 'turn_title' in chunk and chunk['turn_title']: st.session_state.messages.append( {"role": "assistant", "content": chunk['content'], 'turn_title': True, 'thinking_time': chunk['thinking_time'], 'total_thinking_time': chunk['total_thinking_time']}) display_turn_title(chunk) if 'final' in chunk and chunk['final']: # user role would normally do this, but on final step needs to be here st.session_state.messages.append( {"role": "assistant", "content": current_assistant_message, 'final': False}) # last message, so won't reach user turn, so need to store final assistant message from parsing st.session_state.messages.append( {"role": "assistant", "content": chunk['content'], 'final': True}) display_final(chunk, can_rerun=True) elif chunk["role"] == "user": if current_assistant_message: st.session_state.messages.append( {"role": "assistant", "content": current_assistant_message, 'final': chunk.get('final', False)}) # Reset assistant message when user provides input # Display user message if not chunk["initial"] and not st.session_state.show_next: pass else: user_container = st.chat_message("user") with user_container: st.markdown(chunk["content"].replace('\n', ' \n'), unsafe_allow_html=True) st.session_state.messages.append({"role": "user", "content": chunk["content"], 'initial': chunk["initial"]}) st.session_state.turn_count += 1 if current_assistant_message: assistant_placeholder = st.empty() # Reset placeholder current_assistant_message = "" elif chunk["role"] == "action": if chunk["content"] in ["continue?"]: # Continue conversation button continue_clicked = st.button("Continue Conversation") st.session_state.waiting_for_continue = True st.session_state.turn_count += 1 if current_assistant_message: st.session_state.messages.append({"role": "assistant", "content": current_assistant_message}) assistant_placeholder = st.empty() # Reset placeholder current_assistant_message = "" elif chunk["content"] == "end": break elif chunk["role"] == "usage": st.session_state.output_tokens += chunk["content"]["output_tokens"] if "output_tokens" in chunk[ "content"] else 0 st.session_state.input_tokens += chunk["content"]["input_tokens"] if "input_tokens" in chunk[ "content"] else 0 st.session_state.cache_creation_input_tokens += chunk["content"][ "cache_creation_input_tokens"] if "cache_creation_input_tokens" in chunk["content"] else 0 st.session_state.cache_read_input_tokens += chunk["content"][ "cache_read_input_tokens"] if "cache_read_input_tokens" in chunk["content"] else 0 time.sleep(0.001) # Small delay to prevent excessive updates except StopIteration: pass