import asyncio from collections import deque import os import threading import time import traceback import av import numpy as np import streamlit as st from streamlit_webrtc import WebRtcMode, webrtc_streamer import pydub import torch # import av # import cv2 from sample_utils.turn import get_ice_servers import json from typing import List from vosk import SetLogLevel, Model, KaldiRecognizer SetLogLevel(-1) # mutes vosk verbosity from dotenv import load_dotenv load_dotenv() webrtc_ctx = None async def main(): system_one_audio_status = st.empty() playing = st.checkbox("Playing", value=True) system_one_audio_status.write("Initializing streaming") system_one_audio_output = st.empty() system_one_video_output = st.empty() system_one_audio_history = [] system_one_audio_history_output = st.empty() # Initialize resources if not already done print("000") system_one_audio_status.write("Initializing streaming") if "streamlit_av_queue" not in st.session_state: print("001") from streamlit_av_queue import StreamlitAVQueue st.session_state.streamlit_av_queue = StreamlitAVQueue() if "speech_to_text_vosk" not in st.session_state: print("002") from speech_to_text_vosk import SpeechToTextVosk st.session_state.speech_to_text_vosk = SpeechToTextVosk() from chat_pipeline import ChatPipeline if "chat_pipeline" not in st.session_state: print("003") # from chat_pipeline import ChatPipeline # st.session_state.chat_pipeline = ChatPipeline() # await st.session_state.chat_pipeline.start() st.session_state.chat_pipeline = ChatPipeline() await st.session_state.chat_pipeline.start() if "debug_queue" not in st.session_state: st.session_state.debug_queue = [ # "hello, how are you today?", # "hmm, interesting, tell me more about that.", ] system_one_audio_status.write("resources referecned") print("010") system_one_audio_status.write("Initializing webrtc_streamer") webrtc_ctx = webrtc_streamer( key="charles", desired_playing_state=playing, queued_audio_frames_callback=st.session_state.streamlit_av_queue.queued_audio_frames_callback, queued_video_frames_callback=st.session_state.streamlit_av_queue.queued_video_frames_callback, mode=WebRtcMode.SENDRECV, rtc_configuration={"iceServers": get_ice_servers()}, async_processing=True, ) if not webrtc_ctx.state.playing: exit system_one_audio_status.write("Initializing speech") try: while True: if not webrtc_ctx.state.playing: system_one_audio_status.write("Stopped.") await asyncio.sleep(0.1) continue system_one_audio_status.write("Streaming.") if len(st.session_state.debug_queue) > 0: prompt = st.session_state.debug_queue.pop(0) await st.session_state.chat_pipeline.enqueue(prompt) sound_chunk = pydub.AudioSegment.empty() audio_frames = st.session_state.streamlit_av_queue.get_audio_frames() if len(audio_frames) > 0: for audio_frame in audio_frames: sound = pydub.AudioSegment( data=audio_frame.to_ndarray().tobytes(), sample_width=audio_frame.format.bytes, frame_rate=audio_frame.sample_rate, channels=len(audio_frame.layout.channels), ) sound = sound.set_channels(1) sound = sound.set_frame_rate(st.session_state.speech_to_text_vosk.get_audio_bit_rate()) sound_chunk += sound buffer = np.array(sound_chunk.get_array_of_samples()) st.session_state.speech_to_text_vosk.add_speech_bytes(buffer.tobytes()) prompt, speaker_finished = st.session_state.speech_to_text_vosk.get_text() if speaker_finished and len(prompt) > 0: print(f"Prompt: {prompt}") system_one_audio_history.append(prompt) if len(system_one_audio_history) > 10: system_one_audio_history = system_one_audio_history[-10:] table_content = "| System 1 Audio History |\n| --- |\n" table_content += "\n".join([f"| {item} |" for item in reversed(system_one_audio_history)]) system_one_audio_history_output.markdown(table_content) await st.session_state.chat_pipeline.enqueue(prompt) video_frames = st.session_state.streamlit_av_queue.get_video_frames() if len(video_frames) > 0: # for video_frame in video_frames: # system_one_video_output.image(video_frame.to_ndarray()) pass await asyncio.sleep(0.1) # try: # prompts = [ # "hello, how are you today?", # "tell me about your shadow self?", # "hmm, interesting, tell me more about that.", # "wait, that is so interesting, what else?", # ] # for prompt in prompts: # system_one_audio_history.append(prompt) # if len(system_one_audio_history) > 10: # system_one_audio_history = system_one_audio_history[-10:] # table_content = "| System 1 Audio History |\n| --- |\n" # table_content += "\n".join([f"| {item} |" for item in reversed(system_one_audio_history)]) # system_one_audio_history_output.markdown(table_content) # await chat_pipeline.enqueue(prompt) except Exception as e: print(f"An error occurred: {e}") traceback.print_exc() raise e # while True: # if webrtc_ctx.state.playing: # system_one_audio_status.write("Streaming.") # else: # system_one_audio_status.write("Stopped.") # await asyncio.sleep(0.5) if __name__ == "__main__": try: asyncio.run(main()) except Exception as e: if webrtc_ctx is not None: del webrtc_ctx webrtc_ctx = None if "streamlit_av_queue" in st.session_state: del st.session_state.streamlit_av_queue if "speech_to_text_vosk" in st.session_state: del st.session_state.speech_to_text_vosk if "chat_pipeline" in st.session_state: del st.session_state.chat_pipeline finally: pass