import ray import time import asyncio import os @ray.remote class CharlesActor: def __init__(self): self._needs_init = True self._system_one_audio_history_output = "" self._state = "Initializing" def get_state(self): return self._state def get_system_one_audio_history_output(self): return self._system_one_audio_history_output async def _initalize_resources(self): # Initialize resources print("000") from streamlit_av_queue import StreamlitAVQueue self._streamlit_av_queue = StreamlitAVQueue() self._out_audio_queue = self._streamlit_av_queue.get_out_audio_queue() print("001") from ffmpeg_converter_actor import FFMpegConverterActor self._ffmpeg_converter_actor = FFMpegConverterActor.remote(self._out_audio_queue) await self._ffmpeg_converter_actor.start_process.remote() self._ffmpeg_converter_actor.run.remote() print("002") from speech_to_text_vosk_actor import SpeechToTextVoskActor self._speech_to_text_actor = SpeechToTextVoskActor.remote() print("003") from respond_to_prompt_actor import RespondToPromptActor self._respond_to_prompt_actor = RespondToPromptActor.remote(self._ffmpeg_converter_actor) self._debug_queue = [ # "hello, how are you today?", # "hmm, interesting, tell me more about that.", ] print("010") self._needs_init = True self._state = "Initialized" async def start(self): if self._needs_init: await self._initalize_resources() system_one_audio_history = [] self._state = "Waiting for input" total_video_frames = 0 total_audio_frames = 0 loops = 0 process_speech_to_text_future = [] while True: if len(self._debug_queue) > 0: prompt = self._debug_queue.pop(0) await self._respond_to_prompt_actor.enqueue_prompt.remote(prompt) audio_frames = await self._streamlit_av_queue.get_in_audio_frames_async() if len(audio_frames) > 0: total_audio_frames += len(audio_frames) # Concatenate all audio frames into a single buffer audio_buffer = b"".join([buffer.tobytes() for buffer in audio_frames]) future = self._speech_to_text_actor.process_speech.remote(audio_buffer) process_speech_to_text_future.append(future) # audio_frames_task = None if len(process_speech_to_text_future) > 0: ready, _ = ray.wait([process_speech_to_text_future[0]], timeout=0) if ready: prompt, speaker_finished = await process_speech_to_text_future[0] del process_speech_to_text_future[0] if speaker_finished and len(prompt) > 0: print(f"Prompt: {prompt}") system_one_audio_history.append(prompt) if len(system_one_audio_history) > 10: system_one_audio_history = system_one_audio_history[-10:] table_content = "| System 1 Audio History |\n| --- |\n" table_content += "\n".join([f"| {item} |" for item in reversed(system_one_audio_history)]) self._system_one_audio_history_output = table_content await self._respond_to_prompt_actor.enqueue_prompt.remote(prompt) # video_frames = await self._streamlit_av_queue.get_video_frames_async() # if len(video_frames) > 0: # total_video_frames += len(video_frames) # # for video_frame in video_frames: # # system_one_video_output.image(video_frame.to_ndarray()) # # pass # update debug output if (total_video_frames >0 or total_audio_frames > 0): self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames" await asyncio.sleep(0.01) loops+=1 self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames, loops: {loops}" if __name__ == "__main__": if not ray.is_initialized(): # Try to connect to a running Ray cluster ray_address = os.getenv('RAY_ADDRESS') if ray_address: ray.init(ray_address, namespace="project_charles") else: ray.init(namespace="project_charles") charles_actor = CharlesActor.options( name="CharlesActor", get_if_exists=True, ).remote() future = charles_actor.start.remote() try: while True: ready, _ = ray.wait([future], timeout=0) if ready: # The start method has terminated. You can fetch the result (if any) with ray.get(). # If the method raised an exception, it will be re-raised here. try: result = ray.get(future) print(f"The start method has terminated with result: {result}") except Exception as e: print(f"The start method raised an exception: {e}") break else: # The start method is still running. You can poll for debug information here. time.sleep(1) state = charles_actor.get_state.remote() print(f"Charles is in state: {ray.get(state)}") except KeyboardInterrupt as e: print("Script was manually terminated") raise(e)