import asyncio import time from clip_transform import CLIPTransform from chat_service import ChatService from dotenv import load_dotenv from speech_service import SpeechService from concurrent.futures import ThreadPoolExecutor from audio_stream_processor import AudioStreamProcessor from streaming_chat_service import StreamingChatService from pipeline import Pipeline, Node, Job from typing import List class ChatJob(Job): def __init__(self, data, chat_service: ChatService): super().__init__(data) self.chat_service = chat_service class Node1(Node): next_id = 0 async def process_job(self, job: ChatJob): # input job.data is the input string # output job.data is the next sentance async for sentence in job.chat_service.get_responses_as_sentances_async(job.data): if job.chat_service.ignore_sentence(sentence): continue print(f"{sentence}") new_job = ChatJob(sentence, job.chat_service) new_job.id = self.next_id self.next_id += 1 yield new_job class Node2(Node): next_id = 0 async def process_job(self, job: ChatJob): # input job.data is the sentance # output job.data is the streamed speech bytes async for chunk in job.chat_service.get_speech_chunks_async(job.data): new_job = ChatJob(chunk, job.chat_service) new_job.id = self.next_id self.next_id += 1 yield new_job class Node3(Node): # sync_size = 64 # sync = [] async def process_job(self, job: ChatJob): # input job.data is the streamed speech bytes # Node3.sync.append(job.data) job.chat_service.enqueue_speech_bytes_to_play([job.data]) yield job # if len(Node3.sync) >= Node3.sync_size: # audio_chunks = Node3.sync[:Node3.sync_size] # Node3.sync = Node3.sync[Node3.sync_size:] # job.chat_service.enqueue_speech_bytes_to_play(audio_chunks) # yield job class ChatPipeline(): def __init__(self): load_dotenv() self.pipeline = Pipeline() self.audio_processor = AudioStreamProcessor() self.chat_service = StreamingChatService(self.audio_processor, voice_id="2OviOUQc1JsQRQgNkVBj") # Chales003 def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.audio_processor.close() self.audio_processor = None def __del__(self): if self.audio_processor: self.audio_processor.close() self.audio_processor = None async def start(self): self.node1_queue = asyncio.Queue() self.node2_queue = asyncio.Queue() self.node3_queue = asyncio.Queue() self.sync = [] await self.pipeline.add_node(Node1, 1, self.node1_queue, self.node2_queue, sequential_node=True) await self.pipeline.add_node(Node2, 1, self.node2_queue, self.node3_queue, sequential_node=True) await self.pipeline.add_node(Node3, 1, self.node3_queue, None, sequential_node=True) async def enqueue(self, prompt): job = ChatJob(prompt, self.chat_service) await self.pipeline.enqueue_job(job) async def wait_until_all_jobs_idle(self): # TODO - implement this while True: await asyncio.sleep(0.1)