import asyncio import itertools import json import os import torch import openai from audio_stream_processor import AudioStreamProcessor from speech_service import SpeechService class StreamingChatService: def __init__(self, audio_processor:AudioStreamProcessor()=None, api="openai", model_id = "gpt-3.5-turbo", voice_id="Bella"): self._audio_processor = audio_processor self._speech_service = SpeechService(voice_id=voice_id) self._api = api self._device = "cuda:0" if torch.cuda.is_available() else "cpu" self._system_prompt = None openai.api_key = os.getenv("OPENAI_API_KEY") self._model_id = model_id self.reset() def reset(self): self._messages = [] if self._system_prompt: self._messages.append({"role": "system", "content": self._system_prompt}) def _should_we_send_to_voice(self, sentence): sentence_termination_characters = [".", "?", "!"] close_brackets = ['"', ')', ']'] temination_charicter_present = any(c in sentence for c in sentence_termination_characters) # early exit if we don't have a termination character if not temination_charicter_present: return None # early exit the last char is a termination character if sentence[-1] in sentence_termination_characters: return None # early exit the last char is a close bracket if sentence[-1] in close_brackets: return None termination_indices = [sentence.rfind(char) for char in sentence_termination_characters] last_termination_index = max(termination_indices) # handle case of close bracket while last_termination_index+1 < len(sentence) and sentence[last_termination_index+1] in close_brackets: last_termination_index += 1 text_to_speak = sentence[:last_termination_index+1] return text_to_speak def ignore_sentence(self, text_to_speak): # exit if empty, white space or an single breaket if text_to_speak.isspace(): return True # exit if not letters or numbers has_letters = any(char.isalpha() for char in text_to_speak) has_numbers = any(char.isdigit() for char in text_to_speak) if not has_letters and not has_numbers: return True return False def _safe_enqueue_text_to_speak(self, text_to_speak): if self.ignore_sentence(text_to_speak): return stream = self._speech_service.stream(text_to_speak) self._audio_processor.add_audio_stream(stream) def respond_to(self, prompt): self._messages.append({"role": "user", "content": prompt}) agent_response = "" current_sentence = "" response = openai.ChatCompletion.create( model=self._model_id, messages=self._messages, temperature=1.0, # use 1.0 for debugging/deteministic results stream=True ) for chunk in response: chunk_message = chunk['choices'][0]['delta'] if 'content' in chunk_message: chunk_text = chunk_message['content'] # print(chunk_text) current_sentence += chunk_text agent_response += chunk_text text_to_speak = self._should_we_send_to_voice(current_sentence) if text_to_speak: self._safe_enqueue_text_to_speak(text_to_speak) print(text_to_speak) current_sentence = current_sentence[len(text_to_speak):] if len(current_sentence) > 0: self._safe_enqueue_text_to_speak(current_sentence) print(current_sentence) self._messages.append({"role": "assistant", "content": agent_response}) return agent_response async def get_responses_as_sentances_async(self, prompt): self._messages.append({"role": "user", "content": prompt}) agent_response = "" current_sentence = "" response = await openai.ChatCompletion.acreate( model=self._model_id, messages=self._messages, temperature=1.0, # use 1.0 for debugging/deterministic results stream=True ) async for chunk in response: chunk_message = chunk['choices'][0]['delta'] if 'content' in chunk_message: chunk_text = chunk_message['content'] current_sentence += chunk_text agent_response += chunk_text text_to_speak = self._should_we_send_to_voice(current_sentence) if text_to_speak: yield text_to_speak current_sentence = current_sentence[len(text_to_speak):] if len(current_sentence) > 0: yield current_sentence self._messages.append({"role": "assistant", "content": agent_response}) async def get_speech_chunks_async(self, text_to_speak): stream = self._speech_service.stream(text_to_speak) stream, stream_backup = itertools.tee(stream) while True: # Check if there's a next item in the stream next_item = next(stream_backup, None) if next_item is None: # Stream is exhausted, exit the loop break # Run next(stream) in a separate thread to avoid blocking the event loop chunk = await asyncio.to_thread(next, stream) yield chunk def enqueue_speech_bytes_to_play(self, speech_bytes): self._audio_processor.add_audio_stream(speech_bytes)