project_charles / respond_to_prompt_actor.py
sohojoe's picture
create respond to prompt actor
bcea2ea
raw
history blame
3.01 kB
import ray
from ray.util.queue import Queue
from dotenv import load_dotenv
from audio_stream_processor import AudioStreamProcessor
from streaming_chat_service import StreamingChatService
# from ray.actor import ActorHandle
@ray.remote
class PromptToLLMActor:
def __init__(self, input_queue, output_queue, voice_id):
load_dotenv()
self.input_queue = input_queue
self.output_queue = output_queue
self.audio_processor = AudioStreamProcessor()
self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id)
async def run(self):
while True:
prompt = self.input_queue.get()
async for sentence in self.chat_service.get_responses_as_sentances_async(prompt):
if self.chat_service.ignore_sentence(sentence):
continue
print(f"{sentence}")
self.output_queue.put(sentence)
@ray.remote
class LLMSentanceToSpeechActor:
def __init__(self, input_queue, output_queue, voice_id):
load_dotenv()
self.input_queue = input_queue
self.output_queue = output_queue
self.audio_processor = AudioStreamProcessor()
self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id)
async def run(self):
while True:
sentance = self.input_queue.get()
async for chunk in self.chat_service.get_speech_chunks_async(sentance):
self.output_queue.put(chunk)
@ray.remote
class SpeechToSpeakerActor:
def __init__(self, input_queue, voice_id):
load_dotenv()
self.input_queue = input_queue
self.audio_processor = AudioStreamProcessor()
self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id)
async def run(self):
while True:
audio_chunk = self.input_queue.get()
self.chat_service.enqueue_speech_bytes_to_play([audio_chunk])
@ray.remote
class RespondToPromptActor:
def __init__(self):
voice_id="2OviOUQc1JsQRQgNkVBj"
self.prompt_queue = Queue(maxsize=100)
self.llm_sentence_queue = Queue(maxsize=100)
self.speech_chunk_queue = Queue(maxsize=100)
self.prompt_to_llm = PromptToLLMActor.remote(self.prompt_queue, self.llm_sentence_queue, voice_id)
self.llm_sentence_to_speech = LLMSentanceToSpeechActor.remote(self.llm_sentence_queue, self.speech_chunk_queue, voice_id)
self.speech_to_speaker = SpeechToSpeakerActor.remote(self.speech_chunk_queue, voice_id)
# Start the pipeline components.
print ("Starting pipeline components")
self.prompt_to_llm.run.remote()
print ("prompt_to_llm running")
self.llm_sentence_to_speech.run.remote()
print ("llm_sentence_to_speech running")
self.speech_to_speaker.run.remote()
print ("speech_to_speaker running")
def enqueue_prompt(self, prompt):
self.prompt_queue.put(prompt)