project_charles / chat_pipeline.py
sohojoe's picture
created chat_pipeline
730fe87
raw
history blame
3.38 kB
import asyncio
import time
from clip_transform import CLIPTransform
from chat_service import ChatService
from dotenv import load_dotenv
from speech_service import SpeechService
from concurrent.futures import ThreadPoolExecutor
from audio_stream_processor import AudioStreamProcessor
from streaming_chat_service import StreamingChatService
from pipeline import Pipeline, Node, Job
from typing import List
class ChatJob(Job):
def __init__(self, data, chat_service: ChatService):
super().__init__(data)
self.chat_service = chat_service
class Node1(Node):
next_id = 0
async def process_job(self, job: ChatJob):
# input job.data is the input string
# output job.data is the next sentance
async for sentence in job.chat_service.get_responses_as_sentances_async(job.data):
if job.chat_service.ignore_sentence(sentence):
continue
print(f"{sentence}")
new_job = ChatJob(sentence, job.chat_service)
new_job.id = self.next_id
self.next_id += 1
yield new_job
class Node2(Node):
next_id = 0
async def process_job(self, job: ChatJob):
# input job.data is the sentance
# output job.data is the streamed speech bytes
async for chunk in job.chat_service.get_speech_chunks_async(job.data):
new_job = ChatJob(chunk, job.chat_service)
new_job.id = self.next_id
self.next_id += 1
yield new_job
class Node3(Node):
# sync_size = 64
# sync = []
async def process_job(self, job: ChatJob):
# input job.data is the streamed speech bytes
# Node3.sync.append(job.data)
job.chat_service.enqueue_speech_bytes_to_play([job.data])
yield job
# if len(Node3.sync) >= Node3.sync_size:
# audio_chunks = Node3.sync[:Node3.sync_size]
# Node3.sync = Node3.sync[Node3.sync_size:]
# job.chat_service.enqueue_speech_bytes_to_play(audio_chunks)
# yield job
class ChatPipeline():
def __init__(self):
load_dotenv()
self.pipeline = Pipeline()
self.audio_processor = AudioStreamProcessor()
self.chat_service = StreamingChatService(self.audio_processor, voice_id="2OviOUQc1JsQRQgNkVBj") # Chales003
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.audio_processor.close()
self.audio_processor = None
def __del__(self):
if self.audio_processor:
self.audio_processor.close()
self.audio_processor = None
async def start(self):
self.node1_queue = asyncio.Queue()
self.node2_queue = asyncio.Queue()
self.node3_queue = asyncio.Queue()
self.sync = []
await self.pipeline.add_node(Node1, 1, self.node1_queue, self.node2_queue, sequential_node=True)
await self.pipeline.add_node(Node2, 1, self.node2_queue, self.node3_queue, sequential_node=True)
await self.pipeline.add_node(Node3, 1, self.node3_queue, None, sequential_node=True)
async def enqueue(self, prompt):
job = ChatJob(prompt, self.chat_service)
await self.pipeline.enqueue_job(job)
async def wait_until_all_jobs_idle(self):
# TODO - implement this
while True:
await asyncio.sleep(0.1)