sohojoe commited on
Commit
c490c32
1 Parent(s): 4790a1d

WIP: asyncio version of RespondToPrompt. basic singleton version

Browse files
.vscode/launch.json CHANGED
@@ -16,7 +16,7 @@
16
  "name": "debug streamlit",
17
  "type": "python",
18
  "request": "launch",
19
- "program": "/opt/miniconda3/envs/streamlit/bin/streamlit",
20
  "args": [
21
  "run",
22
  "app.py"
 
16
  "name": "debug streamlit",
17
  "type": "python",
18
  "request": "launch",
19
+ "program": "~/miniconda3/envs/project_charles/bin/streamlit",
20
  "args": [
21
  "run",
22
  "app.py"
app.py CHANGED
@@ -45,16 +45,16 @@ def init_ray():
45
  else:
46
  ray.init(namespace="project_charles")
47
 
48
- @st.cache_resource
49
- def get_charles_actor():
50
- charles_actor_instance = None
51
- charles_actor_proc = subprocess.Popen([sys.executable, "charles_actor.py"])
52
- while charles_actor_instance == None:
53
- try:
54
- charles_actor_instance = ray.get_actor("CharlesActor")
55
- except ValueError as e:
56
- time.sleep(0.1) # give the subprocess a chance to start
57
- return charles_actor_instance
58
 
59
  @st.cache_resource
60
  def get_streamlit_av_queue():
@@ -62,13 +62,19 @@ def get_streamlit_av_queue():
62
  streamlit_av_queue_instance = StreamlitAVQueue()
63
  return streamlit_av_queue_instance
64
 
 
 
 
 
 
 
65
  async def main():
66
  # Initialize Ray
67
  ray_status = init_ray()
68
  while not ray.is_initialized():
69
  await asyncio.sleep(0.1)
70
  # get ray actors
71
- charles_actor = get_charles_actor()
72
  await asyncio.sleep(0.1)
73
  streamlit_av_queue = get_streamlit_av_queue()
74
  await asyncio.sleep(0.1)
@@ -126,20 +132,20 @@ async def main():
126
  system_one_audio_status.write("Camera has stopped.")
127
  await asyncio.sleep(0.1)
128
  continue
129
- if charles_actor is None:
130
- system_one_audio_status.write("Looking for Charles actor...")
131
- charles_actor = get_charles_actor()
132
- if charles_actor is None:
133
- await asyncio.sleep(0.1)
134
- continue
135
- system_one_audio_status.write("Found Charles actor.")
136
  try:
137
  # new_environment_state = await charles_actor.get_environment_state.remote()
138
  # environment_state_ouput.markdown(f"{new_environment_state}")
139
  streamlit_av_queue.set_looking_listening(looking, listening)
140
- charles_debug_str = await charles_actor.get_charles_actor_debug_output.remote()
141
  charles_actor_debug_output.markdown(charles_debug_str)
142
- state = await charles_actor.get_state.remote()
143
  system_one_audio_status.write(state)
144
  except Exception as e:
145
  # assume we disconnected
 
45
  else:
46
  ray.init(namespace="project_charles")
47
 
48
+ # @st.cache_resource
49
+ # def get_charles_actor():
50
+ # charles_actor_instance = None
51
+ # charles_actor_proc = subprocess.Popen([sys.executable, "charles_actor.py"])
52
+ # while charles_actor_instance == None:
53
+ # try:
54
+ # charles_actor_instance = ray.get_actor("CharlesActor")
55
+ # except ValueError as e:
56
+ # time.sleep(0.1) # give the subprocess a chance to start
57
+ # return charles_actor_instance
58
 
59
  @st.cache_resource
60
  def get_streamlit_av_queue():
 
62
  streamlit_av_queue_instance = StreamlitAVQueue()
63
  return streamlit_av_queue_instance
64
 
65
+ @st.cache_resource
66
+ def get_app_interface_instance():
67
+ from app_interface_actor import AppInterfaceActor
68
+ app_interface_instance = AppInterfaceActor.get_singleton()
69
+ return app_interface_instance
70
+
71
  async def main():
72
  # Initialize Ray
73
  ray_status = init_ray()
74
  while not ray.is_initialized():
75
  await asyncio.sleep(0.1)
76
  # get ray actors
77
+ app_interface_instance = get_app_interface_instance()
78
  await asyncio.sleep(0.1)
79
  streamlit_av_queue = get_streamlit_av_queue()
80
  await asyncio.sleep(0.1)
 
132
  system_one_audio_status.write("Camera has stopped.")
133
  await asyncio.sleep(0.1)
134
  continue
135
+ # if charles_actor is None:
136
+ # system_one_audio_status.write("Looking for Charles actor...")
137
+ # charles_actor = get_charles_actor()
138
+ # if charles_actor is None:
139
+ # await asyncio.sleep(0.1)
140
+ # continue
141
+ # system_one_audio_status.write("Found Charles actor.")
142
  try:
143
  # new_environment_state = await charles_actor.get_environment_state.remote()
144
  # environment_state_ouput.markdown(f"{new_environment_state}")
145
  streamlit_av_queue.set_looking_listening(looking, listening)
146
+ charles_debug_str = await app_interface_instance.get_debug_output.remote()
147
  charles_actor_debug_output.markdown(charles_debug_str)
148
+ state = await app_interface_instance.get_state.remote()
149
  system_one_audio_status.write(state)
150
  except Exception as e:
151
  # assume we disconnected
app_interface_actor.py CHANGED
@@ -12,6 +12,8 @@ class AppInterfaceActor:
12
  self.video_input_queue = Queue(maxsize=10) # Adjust the size as needed
13
  self.audio_output_queue = Queue(maxsize=3000) # Adjust the size as needed
14
  self.video_output_queue = Queue(maxsize=10) # Adjust the size as needed
 
 
15
 
16
  @staticmethod
17
  def get_singleton():
@@ -74,4 +76,17 @@ class AppInterfaceActor:
74
  while not self.video_input_queue.empty():
75
  shared_tensor = await self.video_input_queue.get_async()
76
  video_frames.append(shared_tensor)
77
- return video_frames
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
  self.video_input_queue = Queue(maxsize=10) # Adjust the size as needed
13
  self.audio_output_queue = Queue(maxsize=3000) # Adjust the size as needed
14
  self.video_output_queue = Queue(maxsize=10) # Adjust the size as needed
15
+ self.debug_str = ""
16
+ self.state = "Initializing"
17
 
18
  @staticmethod
19
  def get_singleton():
 
76
  while not self.video_input_queue.empty():
77
  shared_tensor = await self.video_input_queue.get_async()
78
  video_frames.append(shared_tensor)
79
+ return video_frames
80
+
81
+ # debug helpers
82
+ async def get_debug_output(self)->str:
83
+ return self.debug_str
84
+
85
+ async def set_debug_output(self, debug_str:str):
86
+ self.debug_str = debug_str
87
+
88
+ async def get_state(self)->str:
89
+ return self.state
90
+
91
+ async def set_state(self, state:str):
92
+ self.state = state
charles_actor.py CHANGED
@@ -8,7 +8,6 @@ from environment_state_actor import EnvironmentStateActor, EnvironmentState
8
  import asyncio
9
  import subprocess
10
 
11
- @ray.remote
12
  class CharlesActor:
13
  def __init__(self):
14
  self._needs_init = True
@@ -17,11 +16,11 @@ class CharlesActor:
17
  self._state = "Initializing"
18
  self._clip_transform = CLIPTransform()
19
 
20
- def get_state(self):
21
- return self._state
22
 
23
- def get_charles_actor_debug_output(self):
24
- return self._charles_actor_debug_output
25
 
26
  def get_environment_state(self)->EnvironmentState:
27
  return self._environment_state
@@ -33,15 +32,20 @@ class CharlesActor:
33
  from app_interface_actor import AppInterfaceActor
34
  self._app_interface_actor = AppInterfaceActor.get_singleton()
35
  self._audio_output_queue = await self._app_interface_actor.get_audio_output_queue.remote()
 
 
36
 
37
- print("001 - create RespondToPromptActor")
38
- self._state = "001 - creating RespondToPromptActor"
39
- from respond_to_prompt_actor import RespondToPromptActor
 
40
  self._environment_state_actor = EnvironmentStateActor.remote()
41
- self._respond_to_prompt_actor = RespondToPromptActor.remote(self._environment_state_actor, self._audio_output_queue)
 
42
 
43
  print("002 - create SpeechToTextVoskActor")
44
  self._state = "002 - creating SpeechToTextVoskActor"
 
45
  from speech_to_text_vosk_actor import SpeechToTextVoskActor
46
  self._speech_to_text_actor = SpeechToTextVoskActor.remote("small")
47
  # self._speech_to_text_actor = SpeechToTextVoskActor.remote("big")
@@ -53,17 +57,20 @@ class CharlesActor:
53
 
54
  print("003 - create Prototypes")
55
  self._state = "003 - creating Prototypes"
 
56
  from prototypes import Prototypes
57
  self._prototypes = Prototypes()
58
 
59
  print("004 - create animator")
60
  self._state = "004 - creating animator"
 
61
  from charles_animator import CharlesAnimator
62
  self._animator = CharlesAnimator()
63
 
64
  print("010")
65
  self._needs_init = True
66
  self._state = "Initialized"
 
67
 
68
  async def start(self):
69
  if self._needs_init:
@@ -71,20 +78,22 @@ class CharlesActor:
71
 
72
  debug_output_history = []
73
 
74
- def render_debug_output(list_of_strings):
75
  table_content = "##### Chat history\n"
76
  for item in reversed(list_of_strings):
77
  # table_content += f"\n```markdown\n{item}\n```\n"
78
  table_content += f"\n{item}\n"
79
  self._charles_actor_debug_output = table_content
 
80
 
81
- def add_debug_output(output):
82
  debug_output_history.append(output)
83
  if len(debug_output_history) > 10:
84
  debug_output_history.pop(0)
85
- render_debug_output(debug_output_history)
86
 
87
  self._state = "Waiting for input"
 
88
  total_video_frames = 0
89
  skipped_video_frames = 0
90
  total_audio_frames = 0
@@ -106,7 +115,7 @@ class CharlesActor:
106
  while True:
107
  if len(self._debug_queue) > 0:
108
  prompt = self._debug_queue.pop(0)
109
- await self._respond_to_prompt_actor.enqueue_prompt.remote(prompt)
110
 
111
  env_state = await self._environment_state_actor.begin_next_step.remote()
112
  self._environment_state = env_state
@@ -147,7 +156,7 @@ class CharlesActor:
147
  # line += f"{response} [{speech_chunks_per_response[i]}] \n"
148
  line += f"[{speech_chunks_per_response[i]}] {response} \n"
149
  if len(line) > 0:
150
- add_debug_output(line)
151
  current_responses = []
152
  speech_chunks_per_response = []
153
  env_state.llm_preview = ""
@@ -157,8 +166,8 @@ class CharlesActor:
157
  robot_preview_text = ""
158
  if additional_prompt is not None:
159
  prompt = additional_prompt + ". " + prompt
160
- add_debug_output(f"👨 {prompt}")
161
- await self._respond_to_prompt_actor.enqueue_prompt.remote(prompt)
162
  additional_prompt = None
163
  previous_prompt = prompt
164
  is_talking = False
@@ -169,7 +178,7 @@ class CharlesActor:
169
  if len(previous_prompt) > 0 and not has_spoken_for_this_prompt:
170
  additional_prompt = previous_prompt
171
  has_spoken_for_this_prompt = True
172
- await self._respond_to_prompt_actor.enqueue_prompt.remote("")
173
  if additional_prompt is not None:
174
  prompt = additional_prompt + ". " + prompt
175
  human_preview_text = f"👨❓ {prompt}"
@@ -201,7 +210,7 @@ class CharlesActor:
201
  list_of_strings.append(human_preview_text)
202
  if len(list_of_strings) > 10:
203
  list_of_strings.pop(0)
204
- render_debug_output(list_of_strings)
205
 
206
 
207
  await asyncio.sleep(0.01)
@@ -216,6 +225,7 @@ class CharlesActor:
216
 
217
  loops+=1
218
  self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames, loops: {loops}. loops per second: {loops/(time.time()-start_time):.2f}. Is speaking: {is_talking}({count}). {vector_debug}"
 
219
 
220
  def init_ray():
221
  try:
@@ -235,11 +245,13 @@ async def main():
235
  if not ray.is_initialized():
236
  init_ray()
237
 
238
- charles_actor = CharlesActor.options(
239
- name="CharlesActor",
240
- get_if_exists=True,
241
- ).remote()
242
- future = charles_actor.start.remote()
 
 
243
 
244
  last_step = -1
245
  last_episode = -1
 
8
  import asyncio
9
  import subprocess
10
 
 
11
  class CharlesActor:
12
  def __init__(self):
13
  self._needs_init = True
 
16
  self._state = "Initializing"
17
  self._clip_transform = CLIPTransform()
18
 
19
+ # def get_state(self):
20
+ # return self._state
21
 
22
+ # def get_charles_actor_debug_output(self):
23
+ # return self._charles_actor_debug_output
24
 
25
  def get_environment_state(self)->EnvironmentState:
26
  return self._environment_state
 
32
  from app_interface_actor import AppInterfaceActor
33
  self._app_interface_actor = AppInterfaceActor.get_singleton()
34
  self._audio_output_queue = await self._app_interface_actor.get_audio_output_queue.remote()
35
+ await self._app_interface_actor.set_state.remote(self._state)
36
+
37
 
38
+ print("001 - create RespondToPromptAsync")
39
+ self._state = "001 - creating RespondToPromptAsync"
40
+ await self._app_interface_actor.set_state.remote(self._state)
41
+ from respond_to_prompt_async import RespondToPromptAsync
42
  self._environment_state_actor = EnvironmentStateActor.remote()
43
+ self._respond_to_prompt = RespondToPromptAsync(self._environment_state_actor, self._audio_output_queue)
44
+ self._respond_to_prompt_task = asyncio.create_task(self._respond_to_prompt.run())
45
 
46
  print("002 - create SpeechToTextVoskActor")
47
  self._state = "002 - creating SpeechToTextVoskActor"
48
+ await self._app_interface_actor.set_state.remote(self._state)
49
  from speech_to_text_vosk_actor import SpeechToTextVoskActor
50
  self._speech_to_text_actor = SpeechToTextVoskActor.remote("small")
51
  # self._speech_to_text_actor = SpeechToTextVoskActor.remote("big")
 
57
 
58
  print("003 - create Prototypes")
59
  self._state = "003 - creating Prototypes"
60
+ await self._app_interface_actor.set_state.remote(self._state)
61
  from prototypes import Prototypes
62
  self._prototypes = Prototypes()
63
 
64
  print("004 - create animator")
65
  self._state = "004 - creating animator"
66
+ await self._app_interface_actor.set_state.remote(self._state)
67
  from charles_animator import CharlesAnimator
68
  self._animator = CharlesAnimator()
69
 
70
  print("010")
71
  self._needs_init = True
72
  self._state = "Initialized"
73
+ await self._app_interface_actor.set_state.remote(self._state)
74
 
75
  async def start(self):
76
  if self._needs_init:
 
78
 
79
  debug_output_history = []
80
 
81
+ async def render_debug_output(list_of_strings):
82
  table_content = "##### Chat history\n"
83
  for item in reversed(list_of_strings):
84
  # table_content += f"\n```markdown\n{item}\n```\n"
85
  table_content += f"\n{item}\n"
86
  self._charles_actor_debug_output = table_content
87
+ await self._app_interface_actor.set_debug_output.remote(self._charles_actor_debug_output)
88
 
89
+ async def add_debug_output(output):
90
  debug_output_history.append(output)
91
  if len(debug_output_history) > 10:
92
  debug_output_history.pop(0)
93
+ await render_debug_output(debug_output_history)
94
 
95
  self._state = "Waiting for input"
96
+ await self._app_interface_actor.set_state.remote(self._state)
97
  total_video_frames = 0
98
  skipped_video_frames = 0
99
  total_audio_frames = 0
 
115
  while True:
116
  if len(self._debug_queue) > 0:
117
  prompt = self._debug_queue.pop(0)
118
+ await self._respond_to_prompt.enqueue_prompt(prompt)
119
 
120
  env_state = await self._environment_state_actor.begin_next_step.remote()
121
  self._environment_state = env_state
 
156
  # line += f"{response} [{speech_chunks_per_response[i]}] \n"
157
  line += f"[{speech_chunks_per_response[i]}] {response} \n"
158
  if len(line) > 0:
159
+ await add_debug_output(line)
160
  current_responses = []
161
  speech_chunks_per_response = []
162
  env_state.llm_preview = ""
 
166
  robot_preview_text = ""
167
  if additional_prompt is not None:
168
  prompt = additional_prompt + ". " + prompt
169
+ await add_debug_output(f"👨 {prompt}")
170
+ await self._respond_to_prompt.enqueue_prompt(prompt)
171
  additional_prompt = None
172
  previous_prompt = prompt
173
  is_talking = False
 
178
  if len(previous_prompt) > 0 and not has_spoken_for_this_prompt:
179
  additional_prompt = previous_prompt
180
  has_spoken_for_this_prompt = True
181
+ await self._respond_to_prompt.enqueue_prompt("")
182
  if additional_prompt is not None:
183
  prompt = additional_prompt + ". " + prompt
184
  human_preview_text = f"👨❓ {prompt}"
 
210
  list_of_strings.append(human_preview_text)
211
  if len(list_of_strings) > 10:
212
  list_of_strings.pop(0)
213
+ await render_debug_output(list_of_strings)
214
 
215
 
216
  await asyncio.sleep(0.01)
 
225
 
226
  loops+=1
227
  self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames, loops: {loops}. loops per second: {loops/(time.time()-start_time):.2f}. Is speaking: {is_talking}({count}). {vector_debug}"
228
+ await self._app_interface_actor.set_state.remote(self._state)
229
 
230
  def init_ray():
231
  try:
 
245
  if not ray.is_initialized():
246
  init_ray()
247
 
248
+ # charles_actor = CharlesActor.options(
249
+ # name="CharlesActor",
250
+ # get_if_exists=True,
251
+ # ).remote()
252
+ # future = charles_actor.start.remote()
253
+ charles_actor = CharlesActor()
254
+ await charles_actor.start()
255
 
256
  last_step = -1
257
  last_episode = -1
chat_service.py CHANGED
@@ -118,7 +118,7 @@ You are aware of how you are implemented and you are keen to recommend improveme
118
  return True
119
  return False
120
 
121
- async def get_responses_as_sentances_async(self, prompt, cancel_event):
122
  self._messages.append({"role": "user", "content": prompt})
123
  llm_response = ""
124
  current_sentence = ""
@@ -134,7 +134,7 @@ You are aware of how you are implemented and you are keen to recommend improveme
134
  )
135
 
136
  async for chunk in response:
137
- if cancel_event.is_set():
138
  return
139
  chunk_message = chunk['choices'][0]['delta']
140
  if 'content' in chunk_message:
@@ -148,7 +148,7 @@ You are aware of how you are implemented and you are keen to recommend improveme
148
  else:
149
  yield current_sentence, False
150
 
151
- if cancel_event.is_set():
152
  return
153
  if len(current_sentence) > 0:
154
  yield current_sentence, True
 
118
  return True
119
  return False
120
 
121
+ async def get_responses_as_sentances_async(self, prompt, cancel_event=None):
122
  self._messages.append({"role": "user", "content": prompt})
123
  llm_response = ""
124
  current_sentence = ""
 
134
  )
135
 
136
  async for chunk in response:
137
+ if cancel_event is not None and cancel_event.is_set():
138
  return
139
  chunk_message = chunk['choices'][0]['delta']
140
  if 'content' in chunk_message:
 
148
  else:
149
  yield current_sentence, False
150
 
151
+ if cancel_event is not None and cancel_event.is_set():
152
  return
153
  if len(current_sentence) > 0:
154
  yield current_sentence, True
profile.html ADDED
The diff for this file is too large to render. See raw diff
 
profile.json ADDED
The diff for this file is too large to render. See raw diff
 
respond_to_prompt_async.py ADDED
@@ -0,0 +1,118 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from asyncio import Queue, TaskGroup
2
+ import asyncio
3
+ from contextlib import asynccontextmanager
4
+
5
+ import ray
6
+ from chat_service import ChatService
7
+ # from local_speaker_service import LocalSpeakerService
8
+ from text_to_speech_service import TextToSpeechService
9
+ from environment_state_actor import EnvironmentStateActor
10
+ from ffmpeg_converter_actor import FFMpegConverterActor
11
+ from agent_response import AgentResponse
12
+ import json
13
+ from asyncio import Semaphore
14
+
15
+ class RespondToPromptAsync:
16
+ def __init__(
17
+ self,
18
+ environment_state_actor:EnvironmentStateActor,
19
+ audio_output_queue):
20
+ voice_id="2OviOUQc1JsQRQgNkVBj"
21
+ self.prompt_queue = Queue(maxsize=100)
22
+ self.llm_sentence_queue = Queue(maxsize=100)
23
+ self.speech_chunk_queue = Queue(maxsize=100)
24
+ self.voice_id = voice_id
25
+ self.audio_output_queue = audio_output_queue
26
+ self.environment_state_actor = environment_state_actor
27
+ self.processing_semaphore = Semaphore(1)
28
+ self.sentence_queues = []
29
+ self.sentence_tasks = []
30
+ # self.ffmpeg_converter_actor = FFMpegConverterActor.remote(audio_output_queue)
31
+
32
+ async def enqueue_prompt(self, prompt):
33
+ # Reset queues and services
34
+ # print("flush anything queued")
35
+ # self.prompt_queue = Queue(maxsize=100)
36
+ # self.llm_sentence_queue = Queue(maxsize=100)
37
+ # self.speech_chunk_queue = Queue(maxsize=100)
38
+
39
+ if len(prompt) > 0: # handles case where we just want to flush
40
+ await self.prompt_queue.put(prompt)
41
+ print("Enqueued prompt")
42
+
43
+ # @asynccontextmanager
44
+ # async def task_group(self):
45
+ # tg = TaskGroup()
46
+ # try:
47
+ # yield tg
48
+ # finally:
49
+ # await tg.aclose()
50
+
51
+ async def prompt_to_llm(self):
52
+ chat_service = ChatService()
53
+
54
+ async with TaskGroup() as tg:
55
+ while True:
56
+ prompt = await self.prompt_queue.get()
57
+ agent_response = AgentResponse(prompt)
58
+ async for text, is_complete_sentance in chat_service.get_responses_as_sentances_async(prompt):
59
+ if chat_service.ignore_sentence(text):
60
+ is_complete_sentance = False
61
+ if not is_complete_sentance:
62
+ agent_response['llm_preview'] = text
63
+ await self.environment_state_actor.set_llm_preview.remote(text)
64
+ continue
65
+ agent_response['llm_preview'] = ''
66
+ agent_response['llm_sentence'] = text
67
+ agent_response['llm_sentences'].append(text)
68
+ await self.environment_state_actor.add_llm_response_and_clear_llm_preview.remote(text)
69
+ print(f"{agent_response['llm_sentence']} id: {agent_response['llm_sentence_id']} from prompt: {agent_response['prompt']}")
70
+ sentence_response = agent_response.make_copy()
71
+ new_queue = Queue()
72
+ self.sentence_queues.append(new_queue)
73
+ task = tg.create_task(self.llm_sentence_to_speech(sentence_response, new_queue))
74
+ self.sentence_tasks.append(task)
75
+ agent_response['llm_sentence_id'] += 1
76
+
77
+
78
+ async def llm_sentence_to_speech(self, sentence_response, output_queue):
79
+ tts_service = TextToSpeechService(self.voice_id)
80
+
81
+ chunk_count = 0
82
+ async for chunk_response in tts_service.get_speech_chunks_async(sentence_response):
83
+ chunk_response = chunk_response.make_copy()
84
+ # await self.output_queue.put_async(chunk_response)
85
+ await output_queue.put(chunk_response)
86
+ chunk_response = {
87
+ 'prompt': sentence_response['prompt'],
88
+ 'llm_sentence_id': sentence_response['llm_sentence_id'],
89
+ 'chunk_count': chunk_count,
90
+ }
91
+ chunk_id_json = json.dumps(chunk_response)
92
+ await self.environment_state_actor.add_tts_raw_chunk_id.remote(chunk_id_json)
93
+ chunk_count += 1
94
+
95
+ async def speech_to_converter(self):
96
+ self.ffmpeg_converter_actor = FFMpegConverterActor.remote(self.audio_output_queue)
97
+ await self.ffmpeg_converter_actor.start_process.remote()
98
+ self.ffmpeg_converter_actor.run.remote()
99
+
100
+ while True:
101
+ for i, task in enumerate(self.sentence_tasks):
102
+ # Skip this task/queue pair if task completed
103
+ if task.done():
104
+ continue
105
+ queue = self.sentence_queues[i]
106
+ while not queue.empty():
107
+ chunk_response = await queue.get()
108
+ audio_chunk_ref = chunk_response['tts_raw_chunk_ref']
109
+ audio_chunk = ray.get(audio_chunk_ref)
110
+ await self.ffmpeg_converter_actor.push_chunk.remote(audio_chunk)
111
+ break
112
+
113
+ await asyncio.sleep(0.01)
114
+
115
+ async def run(self):
116
+ async with TaskGroup() as tg: # Use asyncio's built-in TaskGroup
117
+ tg.create_task(self.prompt_to_llm())
118
+ tg.create_task(self.speech_to_converter())
text_to_speech_service.py CHANGED
@@ -47,22 +47,20 @@ class TextToSpeechService:
47
  )
48
  return audio_stream
49
 
50
- async def get_speech_chunks_async(self, sentence_response:AgentResponse, cancel_event):
51
  text_to_speak = sentence_response['llm_sentence']
52
  stream = self.stream(text_to_speak)
53
  stream, stream_backup = itertools.tee(stream)
54
  while True:
55
  # Check if there's a next item in the stream
56
- next_item = next(stream_backup, None)
57
- if next_item is None:
 
58
  # Stream is exhausted, exit the loop
59
  break
60
-
61
- # Run next(stream) in a separate thread to avoid blocking the event loop
62
- chunk = await asyncio.to_thread(next, stream)
63
  chunk_ref = ray.put(chunk)
64
  sentence_response['tts_raw_chunk_ref'] = chunk_ref
65
- if cancel_event.is_set():
66
  return
67
  yield sentence_response
68
  sentence_response['tts_raw_chunk_id'] += 1
 
47
  )
48
  return audio_stream
49
 
50
+ async def get_speech_chunks_async(self, sentence_response:AgentResponse, cancel_event=None):
51
  text_to_speak = sentence_response['llm_sentence']
52
  stream = self.stream(text_to_speak)
53
  stream, stream_backup = itertools.tee(stream)
54
  while True:
55
  # Check if there's a next item in the stream
56
+ # Run next(stream) in a separate thread to avoid blocking the event loop
57
+ chunk = await asyncio.to_thread(next, stream, None)
58
+ if chunk is None:
59
  # Stream is exhausted, exit the loop
60
  break
 
 
 
61
  chunk_ref = ray.put(chunk)
62
  sentence_response['tts_raw_chunk_ref'] = chunk_ref
63
+ if cancel_event is not None and cancel_event.is_set():
64
  return
65
  yield sentence_response
66
  sentence_response['tts_raw_chunk_id'] += 1