sohojoe commited on
Commit
a642a9f
1 Parent(s): 4904f3e

fix audio issue when starting a new responce

Browse files
app_interface_actor.py CHANGED
@@ -1,16 +1,32 @@
 
1
  import ray
2
- from ray.util.queue import Queue
3
  from ray.actor import ActorHandle
4
- import torch
5
  import numpy as np
6
  import pid_helper
7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8
  @ray.remote
9
  class AppInterfaceActor:
10
  def __init__(self):
11
  self.audio_input_queue = Queue(maxsize=3000) # Adjust the size as needed
12
  self.video_input_queue = Queue(maxsize=10) # Adjust the size as needed
13
- self.audio_output_queue = Queue(maxsize=50) # Adjust the size as needed
 
14
  self.video_output_queue = Queue(maxsize=10) # Adjust the size as needed
15
  self.debug_str = ""
16
  self.state = "Initializing"
@@ -37,9 +53,14 @@ class AppInterfaceActor:
37
  await self.audio_input_queue.put_async(shared_buffer_ref)
38
 
39
  async def dequeue_audio_output_frame_async(self):
40
- if self.audio_output_queue.empty():
41
- return None
42
- frame = await self.audio_output_queue.get_async()
 
 
 
 
 
43
  return frame
44
 
45
  async def dequeue_video_output_frames_async(self):
@@ -51,9 +72,14 @@ class AppInterfaceActor:
51
  video_frames.append(shared_tensor)
52
  return video_frames
53
 
54
- # functions for application to dequeue input, enqueue output
55
  def get_audio_output_queue(self)->Queue:
56
  return self.audio_output_queue
 
 
 
 
 
57
 
58
  async def enqueue_video_output_frame(self, shared_tensor_ref):
59
  if self.video_output_queue.full():
 
1
+ import time
2
  import ray
3
+ from ray.util.queue import Queue, Empty
4
  from ray.actor import ActorHandle
 
5
  import numpy as np
6
  import pid_helper
7
 
8
+ # Ray Queue's take ~.5 seconds to splin up;
9
+ # this class creates a pool of queues to cycle through
10
+ class QueueFactory:
11
+ def __init__(self, max_size:int):
12
+ self.queues:[Queue] = []
13
+ self.queue_size = 5
14
+ self.max_size = max_size
15
+ while len(self.queues) < self.queue_size:
16
+ self.queues.append(Queue(maxsize=max_size))
17
+
18
+ def get_queue(self)->Queue:
19
+ queue = self.queues.pop(0)
20
+ self.queues.append(Queue(maxsize=self.max_size))
21
+ return queue
22
+
23
  @ray.remote
24
  class AppInterfaceActor:
25
  def __init__(self):
26
  self.audio_input_queue = Queue(maxsize=3000) # Adjust the size as needed
27
  self.video_input_queue = Queue(maxsize=10) # Adjust the size as needed
28
+ self.audio_output_queue_factory = QueueFactory(max_size=50)
29
+ self.audio_output_queue = self.audio_output_queue_factory.get_queue()
30
  self.video_output_queue = Queue(maxsize=10) # Adjust the size as needed
31
  self.debug_str = ""
32
  self.state = "Initializing"
 
53
  await self.audio_input_queue.put_async(shared_buffer_ref)
54
 
55
  async def dequeue_audio_output_frame_async(self):
56
+ start_time = time.time()
57
+ try:
58
+ frame = await self.audio_output_queue.get_async(block=False)
59
+ except Empty:
60
+ frame = None
61
+ elapsed_time = time.time() - start_time
62
+ if elapsed_time > 0.1:
63
+ print (f"dequeue_audio_output_frame_async time: {elapsed_time}. was empty: {frame is None}. frame type: {type(frame) if frame else str(0)}")
64
  return frame
65
 
66
  async def dequeue_video_output_frames_async(self):
 
72
  video_frames.append(shared_tensor)
73
  return video_frames
74
 
75
+ # functions for application to dequeue input, enqueue output
76
  def get_audio_output_queue(self)->Queue:
77
  return self.audio_output_queue
78
+
79
+ async def cycle_output_queue(self)->Queue:
80
+ self.audio_output_queue.shutdown(grace_period_s=0.1)
81
+ self.audio_output_queue = self.audio_output_queue_factory.get_queue()
82
+ return self.audio_output_queue
83
 
84
  async def enqueue_video_output_frame(self, shared_tensor_ref):
85
  if self.video_output_queue.full():
charles_app.py CHANGED
@@ -1,4 +1,3 @@
1
- import json
2
  import ray
3
  import time
4
  import asyncio
@@ -62,12 +61,12 @@ class CharlesApp:
62
  self.set_state("010 - Initialized")
63
 
64
  async def cancel_response_task(self):
65
- if self._respond_to_prompt_task is None:
66
- return
67
- await self._respond_to_prompt.terminate()
68
- self._respond_to_prompt_task.cancel()
69
- self._respond_to_prompt_task = None
70
- self._respond_to_prompt = None
71
 
72
  async def start(self):
73
  if self._needs_init:
@@ -164,7 +163,7 @@ class CharlesApp:
164
  if additional_prompt is not None:
165
  prompt = additional_prompt + ". " + prompt
166
  human_preview_text = f"👨❓ {prompt}"
167
- # await self.cancel_response_task() # TODO re-enable to interupt when user speaks
168
 
169
 
170
  # i choose to add each line of responce one at a time as them come in
 
 
1
  import ray
2
  import time
3
  import asyncio
 
61
  self.set_state("010 - Initialized")
62
 
63
  async def cancel_response_task(self):
64
+ if self._respond_to_prompt_task is not None:
65
+ await self._respond_to_prompt.terminate()
66
+ self._respond_to_prompt_task.cancel()
67
+ self._respond_to_prompt_task = None
68
+ self._respond_to_prompt = None
69
+ self._audio_output_queue = await self._app_interface_actor.cycle_output_queue.remote()
70
 
71
  async def start(self):
72
  if self._needs_init:
 
163
  if additional_prompt is not None:
164
  prompt = additional_prompt + ". " + prompt
165
  human_preview_text = f"👨❓ {prompt}"
166
+ await self.cancel_response_task()
167
 
168
 
169
  # i choose to add each line of responce one at a time as them come in
ffmpeg_converter.py CHANGED
@@ -1,7 +1,6 @@
1
-
2
  import asyncio
3
  import ray
4
- from ray.util.queue import Queue
5
 
6
  class FFMpegConverter:
7
  def __init__(self, output_queue: Queue, buffer_size: int = 1920, output_format: str='s16le'):
@@ -24,9 +23,19 @@ class FFMpegConverter:
24
  # If the pipe is broken, restart the process.
25
  await self.start_process()
26
  continue
 
 
27
  # print(f"FFMpegConverter: read {len(chunk)} bytes")
28
  chunk_ref = ray.put(chunk)
29
- await self.output_queue.put_async(chunk_ref)
 
 
 
 
 
 
 
 
30
 
31
  async def start_process(self):
32
  cmd = [
@@ -52,6 +61,8 @@ class FFMpegConverter:
52
  # print (f"input_pipe: {self.input_pipe}")
53
 
54
  async def push_chunk(self, chunk):
 
 
55
  try:
56
  self.input_pipe.write(chunk)
57
  await self.input_pipe.drain()
@@ -67,5 +78,3 @@ class FFMpegConverter:
67
  self.process.stdin.transport.close()
68
  self.process.kill()
69
  self.process.terminate()
70
- # while not self.output_queue.empty():
71
- # await self.output_queue.get_async()
 
 
1
  import asyncio
2
  import ray
3
+ from ray.util.queue import Queue, Full
4
 
5
  class FFMpegConverter:
6
  def __init__(self, output_queue: Queue, buffer_size: int = 1920, output_format: str='s16le'):
 
23
  # If the pipe is broken, restart the process.
24
  await self.start_process()
25
  continue
26
+ if self.running == False:
27
+ return
28
  # print(f"FFMpegConverter: read {len(chunk)} bytes")
29
  chunk_ref = ray.put(chunk)
30
+ keep_trying = True
31
+ while keep_trying:
32
+ try:
33
+ await self.output_queue.put_async(chunk_ref, timeout=0.01)
34
+ keep_trying = False
35
+ except Full:
36
+ if self.running == False:
37
+ return
38
+ await asyncio.sleep(0.01)
39
 
40
  async def start_process(self):
41
  cmd = [
 
61
  # print (f"input_pipe: {self.input_pipe}")
62
 
63
  async def push_chunk(self, chunk):
64
+ if self.running == False:
65
+ return
66
  try:
67
  self.input_pipe.write(chunk)
68
  await self.input_pipe.drain()
 
78
  self.process.stdin.transport.close()
79
  self.process.kill()
80
  self.process.terminate()
 
 
respond_to_prompt_async.py CHANGED
@@ -1,10 +1,7 @@
1
  from asyncio import Queue, TaskGroup
2
  import asyncio
3
- from contextlib import asynccontextmanager
4
-
5
  import ray
6
  from chat_service import ChatService
7
- # from local_speaker_service import LocalSpeakerService
8
  from text_to_speech_service import TextToSpeechService
9
  from response_state_manager import ResponseStateManager
10
  from ffmpeg_converter import FFMpegConverter
@@ -97,23 +94,23 @@ class RespondToPromptAsync:
97
 
98
  async def terminate(self):
99
  # Cancel tasks
 
100
  if self.task_group_tasks:
101
  for task in self.task_group_tasks:
102
  task.cancel()
 
103
  for task in self.sentence_tasks:
104
  task.cancel()
 
 
105
 
106
  # Close FFmpeg converter actor
107
  if self.ffmpeg_converter_task:
108
- self.ffmpeg_converter_task.cancel()
109
  await self.ffmpeg_converter.close()
 
110
  # ray.kill(self.ffmpeg_converter)
111
 
112
  # Flush all queues
113
- # TODO re-enable to interupt when user speaks
114
- # while not self.audio_output_queue.empty():
115
- # await self.audio_output_queue.get_async()
116
- # # await self.audio_output_queue.get_async(block=False)
117
  while not self.llm_sentence_queue.empty():
118
  self.llm_sentence_queue.get_nowait()
119
  while not self.speech_chunk_queue.empty():
 
1
  from asyncio import Queue, TaskGroup
2
  import asyncio
 
 
3
  import ray
4
  from chat_service import ChatService
 
5
  from text_to_speech_service import TextToSpeechService
6
  from response_state_manager import ResponseStateManager
7
  from ffmpeg_converter import FFMpegConverter
 
94
 
95
  async def terminate(self):
96
  # Cancel tasks
97
+ all_tasks = []
98
  if self.task_group_tasks:
99
  for task in self.task_group_tasks:
100
  task.cancel()
101
+ all_tasks.extend(self.task_group_tasks)
102
  for task in self.sentence_tasks:
103
  task.cancel()
104
+ all_tasks.extend(self.sentence_tasks)
105
+ await asyncio.gather(*all_tasks, return_exceptions=True)
106
 
107
  # Close FFmpeg converter actor
108
  if self.ffmpeg_converter_task:
 
109
  await self.ffmpeg_converter.close()
110
+ self.ffmpeg_converter_task.cancel()
111
  # ray.kill(self.ffmpeg_converter)
112
 
113
  # Flush all queues
 
 
 
 
114
  while not self.llm_sentence_queue.empty():
115
  self.llm_sentence_queue.get_nowait()
116
  while not self.speech_chunk_queue.empty():
streamlit_av_queue.py CHANGED
@@ -104,7 +104,12 @@ class StreamlitAVQueue:
104
  # print (f"frame: {frame.format.name}, {frame.layout.name}, {frame.sample_rate}, {frame.samples}")
105
  assert frame.format.bytes == 2
106
  assert frame.format.name == 's16'
 
 
107
  frame_as_bytes = await self.app_interface_actor.dequeue_audio_output_frame_async.remote()
 
 
 
108
  if frame_as_bytes:
109
  # print(f"frame_as_bytes: {len(frame_as_bytes)}")
110
  assert len(frame_as_bytes) == frame.samples * frame.format.bytes
 
104
  # print (f"frame: {frame.format.name}, {frame.layout.name}, {frame.sample_rate}, {frame.samples}")
105
  assert frame.format.bytes == 2
106
  assert frame.format.name == 's16'
107
+ import time
108
+ start_time = time.time()
109
  frame_as_bytes = await self.app_interface_actor.dequeue_audio_output_frame_async.remote()
110
+ elapsed_time = time.time() - start_time
111
+ if elapsed_time > 0.1:
112
+ print (f"app_interface_actor.dequeue_audio_output_frame_async() elapsed_time: {elapsed_time}")
113
  if frame_as_bytes:
114
  # print(f"frame_as_bytes: {len(frame_as_bytes)}")
115
  assert len(frame_as_bytes) == frame.samples * frame.format.bytes