File size: 4,192 Bytes
bcea2ea
 
 
 
 
afc1e50
bcea2ea
 
 
 
 
 
 
 
 
 
afc1e50
bcea2ea
 
 
afc1e50
 
 
bcea2ea
 
 
afc1e50
 
 
 
 
 
 
 
 
bcea2ea
 
 
 
 
 
 
 
 
afc1e50
bcea2ea
 
 
afc1e50
 
 
 
 
 
 
 
 
 
 
 
 
bcea2ea
 
 
 
 
 
 
 
 
 
 
afc1e50
bcea2ea
afc1e50
 
 
 
bcea2ea
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
afc1e50
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import ray
from ray.util.queue import Queue
from dotenv import load_dotenv
from audio_stream_processor import AudioStreamProcessor
from streaming_chat_service import StreamingChatService
import asyncio
# from ray.actor import ActorHandle

@ray.remote
class PromptToLLMActor:
    def __init__(self, input_queue, output_queue, voice_id):
        load_dotenv()
        self.input_queue = input_queue
        self.output_queue = output_queue
        self.audio_processor = AudioStreamProcessor()
        self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id)
        self.cancel_event = None

    async def run(self):
        while True:
            prompt = await self.input_queue.get_async()
            self.cancel_event = asyncio.Event()
            async for sentence in self.chat_service.get_responses_as_sentances_async(prompt, self.cancel_event):
                if self.chat_service.ignore_sentence(sentence):
                    continue
                print(f"{sentence}")
                await self.output_queue.put_async(sentence)
                
    def cancel(self):
        if self.cancel_event:
            self.cancel_event.set()
        while not self.input_queue.empty():
            self.input_queue.get()
        while not self.output_queue.empty():
            self.output_queue.get()

@ray.remote
class LLMSentanceToSpeechActor:
    def __init__(self, input_queue, output_queue, voice_id):
        load_dotenv()
        self.input_queue = input_queue
        self.output_queue = output_queue
        self.audio_processor = AudioStreamProcessor()
        self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id)
        self.cancel_event = None

    async def run(self):
        while True:
            sentance = await self.input_queue.get_async()
            self.cancel_event = asyncio.Event()
            async for chunk in self.chat_service.get_speech_chunks_async(sentance, self.cancel_event):
                await self.output_queue.put_async(chunk)

    def cancel(self):
        if self.cancel_event:
            self.cancel_event.set()
        while not self.input_queue.empty():
            self.input_queue.get()
        while not self.output_queue.empty():
            self.output_queue.get()


@ray.remote
class SpeechToSpeakerActor:
    def __init__(self, input_queue, voice_id):
        load_dotenv()
        self.input_queue = input_queue
        self.audio_processor = AudioStreamProcessor()
        self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id)

    async def run(self):
        while True:
            audio_chunk = await self.input_queue.get_async()
            self.chat_service.enqueue_speech_bytes_to_play([audio_chunk])
            
    def cancel(self):
        while not self.input_queue.empty():
            self.input_queue.get()            

@ray.remote
class RespondToPromptActor:
    def __init__(self):
        voice_id="2OviOUQc1JsQRQgNkVBj"
        self.prompt_queue = Queue(maxsize=100)
        self.llm_sentence_queue = Queue(maxsize=100)
        self.speech_chunk_queue = Queue(maxsize=100)
        
        self.prompt_to_llm = PromptToLLMActor.remote(self.prompt_queue, self.llm_sentence_queue, voice_id)
        self.llm_sentence_to_speech = LLMSentanceToSpeechActor.remote(self.llm_sentence_queue, self.speech_chunk_queue, voice_id)
        self.speech_to_speaker = SpeechToSpeakerActor.remote(self.speech_chunk_queue, voice_id)

        # Start the pipeline components.
        self.prompt_to_llm.run.remote()
        self.llm_sentence_to_speech.run.remote()
        self.speech_to_speaker.run.remote()
            
    def enqueue_prompt(self, prompt):
        print("flush anything queued")
        prompt_to_llm_future = self.prompt_to_llm.cancel.remote()
        llm_sentence_to_speech_future = self.llm_sentence_to_speech.cancel.remote()
        speech_to_speaker_future = self.speech_to_speaker.cancel.remote()
        ray.get([
            prompt_to_llm_future,
            llm_sentence_to_speech_future,
            speech_to_speaker_future,
        ])
        self.prompt_queue.put(prompt)
        print("Enqueued prompt")