Spaces:
Sleeping
Sleeping
File size: 4,192 Bytes
bcea2ea afc1e50 bcea2ea afc1e50 bcea2ea afc1e50 bcea2ea afc1e50 bcea2ea afc1e50 bcea2ea afc1e50 bcea2ea afc1e50 bcea2ea afc1e50 bcea2ea afc1e50 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
import ray
from ray.util.queue import Queue
from dotenv import load_dotenv
from audio_stream_processor import AudioStreamProcessor
from streaming_chat_service import StreamingChatService
import asyncio
# from ray.actor import ActorHandle
@ray.remote
class PromptToLLMActor:
def __init__(self, input_queue, output_queue, voice_id):
load_dotenv()
self.input_queue = input_queue
self.output_queue = output_queue
self.audio_processor = AudioStreamProcessor()
self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id)
self.cancel_event = None
async def run(self):
while True:
prompt = await self.input_queue.get_async()
self.cancel_event = asyncio.Event()
async for sentence in self.chat_service.get_responses_as_sentances_async(prompt, self.cancel_event):
if self.chat_service.ignore_sentence(sentence):
continue
print(f"{sentence}")
await self.output_queue.put_async(sentence)
def cancel(self):
if self.cancel_event:
self.cancel_event.set()
while not self.input_queue.empty():
self.input_queue.get()
while not self.output_queue.empty():
self.output_queue.get()
@ray.remote
class LLMSentanceToSpeechActor:
def __init__(self, input_queue, output_queue, voice_id):
load_dotenv()
self.input_queue = input_queue
self.output_queue = output_queue
self.audio_processor = AudioStreamProcessor()
self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id)
self.cancel_event = None
async def run(self):
while True:
sentance = await self.input_queue.get_async()
self.cancel_event = asyncio.Event()
async for chunk in self.chat_service.get_speech_chunks_async(sentance, self.cancel_event):
await self.output_queue.put_async(chunk)
def cancel(self):
if self.cancel_event:
self.cancel_event.set()
while not self.input_queue.empty():
self.input_queue.get()
while not self.output_queue.empty():
self.output_queue.get()
@ray.remote
class SpeechToSpeakerActor:
def __init__(self, input_queue, voice_id):
load_dotenv()
self.input_queue = input_queue
self.audio_processor = AudioStreamProcessor()
self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id)
async def run(self):
while True:
audio_chunk = await self.input_queue.get_async()
self.chat_service.enqueue_speech_bytes_to_play([audio_chunk])
def cancel(self):
while not self.input_queue.empty():
self.input_queue.get()
@ray.remote
class RespondToPromptActor:
def __init__(self):
voice_id="2OviOUQc1JsQRQgNkVBj"
self.prompt_queue = Queue(maxsize=100)
self.llm_sentence_queue = Queue(maxsize=100)
self.speech_chunk_queue = Queue(maxsize=100)
self.prompt_to_llm = PromptToLLMActor.remote(self.prompt_queue, self.llm_sentence_queue, voice_id)
self.llm_sentence_to_speech = LLMSentanceToSpeechActor.remote(self.llm_sentence_queue, self.speech_chunk_queue, voice_id)
self.speech_to_speaker = SpeechToSpeakerActor.remote(self.speech_chunk_queue, voice_id)
# Start the pipeline components.
self.prompt_to_llm.run.remote()
self.llm_sentence_to_speech.run.remote()
self.speech_to_speaker.run.remote()
def enqueue_prompt(self, prompt):
print("flush anything queued")
prompt_to_llm_future = self.prompt_to_llm.cancel.remote()
llm_sentence_to_speech_future = self.llm_sentence_to_speech.cancel.remote()
speech_to_speaker_future = self.speech_to_speaker.cancel.remote()
ray.get([
prompt_to_llm_future,
llm_sentence_to_speech_future,
speech_to_speaker_future,
])
self.prompt_queue.put(prompt)
print("Enqueued prompt")
|