import ray from ray.util.queue import Queue from dotenv import load_dotenv from audio_stream_processor import AudioStreamProcessor from streaming_chat_service import StreamingChatService import asyncio # from ray.actor import ActorHandle @ray.remote class PromptToLLMActor: def __init__(self, input_queue, output_queue, voice_id): load_dotenv() self.input_queue = input_queue self.output_queue = output_queue self.audio_processor = AudioStreamProcessor() self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id) self.cancel_event = None async def run(self): while True: prompt = await self.input_queue.get_async() self.cancel_event = asyncio.Event() async for sentence in self.chat_service.get_responses_as_sentances_async(prompt, self.cancel_event): if self.chat_service.ignore_sentence(sentence): continue print(f"{sentence}") await self.output_queue.put_async(sentence) def cancel(self): if self.cancel_event: self.cancel_event.set() while not self.input_queue.empty(): self.input_queue.get() while not self.output_queue.empty(): self.output_queue.get() @ray.remote class LLMSentanceToSpeechActor: def __init__(self, input_queue, output_queue, voice_id): load_dotenv() self.input_queue = input_queue self.output_queue = output_queue self.audio_processor = AudioStreamProcessor() self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id) self.cancel_event = None async def run(self): while True: sentance = await self.input_queue.get_async() self.cancel_event = asyncio.Event() async for chunk in self.chat_service.get_speech_chunks_async(sentance, self.cancel_event): await self.output_queue.put_async(chunk) def cancel(self): if self.cancel_event: self.cancel_event.set() while not self.input_queue.empty(): self.input_queue.get() while not self.output_queue.empty(): self.output_queue.get() @ray.remote class SpeechToSpeakerActor: def __init__(self, input_queue, voice_id): load_dotenv() self.input_queue = input_queue self.audio_processor = AudioStreamProcessor() self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id) async def run(self): while True: audio_chunk = await self.input_queue.get_async() # print (f"Got audio chunk {len(audio_chunk)}") self.chat_service.enqueue_speech_bytes_to_play([audio_chunk]) def cancel(self): while not self.input_queue.empty(): self.input_queue.get() @ray.remote class SpeechToConverterActor: def __init__(self, input_queue, ffmpeg_converter_actor): load_dotenv() self.input_queue = input_queue self.ffmpeg_converter_actor = ffmpeg_converter_actor async def run(self): while True: audio_chunk = await self.input_queue.get_async() # print (f"Got audio chunk {len(audio_chunk)}") await self.ffmpeg_converter_actor.push_chunk.remote(audio_chunk) def cancel(self): while not self.input_queue.empty(): self.input_queue.get() @ray.remote class RespondToPromptActor: def __init__(self, ffmpeg_converter_actor): voice_id="2OviOUQc1JsQRQgNkVBj" self.prompt_queue = Queue(maxsize=100) self.llm_sentence_queue = Queue(maxsize=100) self.speech_chunk_queue = Queue(maxsize=100) self.prompt_to_llm = PromptToLLMActor.remote(self.prompt_queue, self.llm_sentence_queue, voice_id) self.llm_sentence_to_speech = LLMSentanceToSpeechActor.remote(self.llm_sentence_queue, self.speech_chunk_queue, voice_id) # self.speech_output = SpeechToSpeakerActor.remote(self.speech_chunk_queue, voice_id) self.speech_output = SpeechToConverterActor.remote(self.speech_chunk_queue, ffmpeg_converter_actor) # Start the pipeline components. self.prompt_to_llm.run.remote() self.llm_sentence_to_speech.run.remote() self.speech_output.run.remote() def enqueue_prompt(self, prompt): print("flush anything queued") prompt_to_llm_future = self.prompt_to_llm.cancel.remote() llm_sentence_to_speech_future = self.llm_sentence_to_speech.cancel.remote() speech_to_speaker_future = self.speech_output.cancel.remote() ray.get([ prompt_to_llm_future, llm_sentence_to_speech_future, speech_to_speaker_future, ]) self.prompt_queue.put(prompt) print("Enqueued prompt")