File size: 3,384 Bytes
730fe87
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import asyncio
import time
from clip_transform import CLIPTransform
from chat_service import ChatService
from dotenv import load_dotenv
from speech_service import SpeechService
from concurrent.futures import ThreadPoolExecutor
from audio_stream_processor import AudioStreamProcessor
from streaming_chat_service import StreamingChatService
from pipeline import Pipeline, Node, Job
from typing import List

class ChatJob(Job):
    def __init__(self, data, chat_service: ChatService):
        super().__init__(data)
        self.chat_service = chat_service

class Node1(Node):
    next_id = 0

    async def process_job(self, job: ChatJob):
        # input job.data is the input string
        # output job.data is the next sentance
        async for sentence in job.chat_service.get_responses_as_sentances_async(job.data):
            if job.chat_service.ignore_sentence(sentence):
                continue
            print(f"{sentence}")
            new_job = ChatJob(sentence, job.chat_service)
            new_job.id = self.next_id
            self.next_id += 1
            yield new_job
    
class Node2(Node):
    next_id = 0

    async def process_job(self, job: ChatJob):
        # input job.data is the sentance
        # output job.data is the streamed speech bytes
        async for chunk in job.chat_service.get_speech_chunks_async(job.data):
            new_job = ChatJob(chunk, job.chat_service)
            new_job.id = self.next_id
            self.next_id += 1
            yield new_job


class Node3(Node):
    # sync_size = 64
    # sync = []

    async def process_job(self, job: ChatJob):
        # input job.data is the streamed speech bytes
        # Node3.sync.append(job.data)
        job.chat_service.enqueue_speech_bytes_to_play([job.data])
        yield job
        # if len(Node3.sync) >= Node3.sync_size:
        #     audio_chunks = Node3.sync[:Node3.sync_size]
        #     Node3.sync = Node3.sync[Node3.sync_size:]
        #     job.chat_service.enqueue_speech_bytes_to_play(audio_chunks)
        # yield job

class ChatPipeline():
    def __init__(self):
        load_dotenv()
        self.pipeline = Pipeline()
        self.audio_processor = AudioStreamProcessor()
        self.chat_service = StreamingChatService(self.audio_processor, voice_id="2OviOUQc1JsQRQgNkVBj") # Chales003

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.audio_processor.close()
        self.audio_processor = None
    
    def __del__(self):
        if self.audio_processor:
            self.audio_processor.close()
            self.audio_processor = None

    async def start(self):
        self.node1_queue = asyncio.Queue()
        self.node2_queue = asyncio.Queue()
        self.node3_queue = asyncio.Queue()
        self.sync = []
        await self.pipeline.add_node(Node1, 1, self.node1_queue, self.node2_queue, sequential_node=True)
        await self.pipeline.add_node(Node2, 1, self.node2_queue, self.node3_queue, sequential_node=True)
        await self.pipeline.add_node(Node3, 1, self.node3_queue, None, sequential_node=True)

    async def enqueue(self, prompt):
        job = ChatJob(prompt, self.chat_service)
        await self.pipeline.enqueue_job(job)

    async def wait_until_all_jobs_idle(self):
        # TODO - implement this
        while True:
            await asyncio.sleep(0.1)