sohojoe commited on
Commit
aec6f97
1 Parent(s): d9da748

refactor: in/out_audio/video to audio/video_input/output

Browse files
charles_actor.py CHANGED
@@ -33,15 +33,15 @@ class CharlesActor:
33
  self._state = "000 - creating StreamlitAVQueue"
34
  from streamlit_av_queue import StreamlitAVQueue
35
  self._streamlit_av_queue = StreamlitAVQueue()
36
- self._out_audio_queue = await self._streamlit_av_queue.get_out_audio_queue()
37
- self._out_video_queue = await self._streamlit_av_queue.get_out_video_queue()
38
 
39
  print("001 - create RespondToPromptActor")
40
  self._state = "001 - creating RespondToPromptActor"
41
  from respond_to_prompt_actor import RespondToPromptActor
42
  self._environment_state_actor = EnvironmentStateActor.remote()
43
  self._agent_state_actor = AgentStateActor.remote()
44
- self._respond_to_prompt_actor = RespondToPromptActor.remote(self._environment_state_actor, self._out_audio_queue)
45
 
46
  print("002 - create SpeechToTextVoskActor")
47
  self._state = "002 - creating SpeechToTextVoskActor"
@@ -114,7 +114,7 @@ class CharlesActor:
114
  env_state = await self._environment_state_actor.begin_next_step.remote()
115
  self._environment_state = env_state
116
  self._agent_state_actor.begin_step.remote()
117
- audio_frames = await self._streamlit_av_queue.get_in_audio_frames_async()
118
  video_frames = await self._streamlit_av_queue.get_video_frames_async()
119
 
120
  if len(audio_frames) > 0:
@@ -211,15 +211,15 @@ class CharlesActor:
211
  await asyncio.sleep(0.01)
212
 
213
  # add observations to the environment state
214
- count = len(self._out_audio_queue)
215
  is_talking = bool(count > 0)
216
  has_spoken_for_this_prompt = has_spoken_for_this_prompt or is_talking
217
  frame = self._animator.update(is_talking)
218
- if self._out_video_queue.full():
219
- evicted_item = await self._out_video_queue.get_async()
220
  del evicted_item
221
  frame_ref = ray.put(frame)
222
- await self._out_video_queue.put_async(frame_ref)
223
 
224
  loops+=1
225
  self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames, loops: {loops}. loops per second: {loops/(time.time()-start_time):.2f}. Is speaking: {is_talking}({count}). {vector_debug}"
 
33
  self._state = "000 - creating StreamlitAVQueue"
34
  from streamlit_av_queue import StreamlitAVQueue
35
  self._streamlit_av_queue = StreamlitAVQueue()
36
+ self._audio_output_queue = await self._streamlit_av_queue.get_audio_output_queue()
37
+ self._video_output_queue = await self._streamlit_av_queue.get_video_output_queue()
38
 
39
  print("001 - create RespondToPromptActor")
40
  self._state = "001 - creating RespondToPromptActor"
41
  from respond_to_prompt_actor import RespondToPromptActor
42
  self._environment_state_actor = EnvironmentStateActor.remote()
43
  self._agent_state_actor = AgentStateActor.remote()
44
+ self._respond_to_prompt_actor = RespondToPromptActor.remote(self._environment_state_actor, self._audio_output_queue)
45
 
46
  print("002 - create SpeechToTextVoskActor")
47
  self._state = "002 - creating SpeechToTextVoskActor"
 
114
  env_state = await self._environment_state_actor.begin_next_step.remote()
115
  self._environment_state = env_state
116
  self._agent_state_actor.begin_step.remote()
117
+ audio_frames = await self._streamlit_av_queue.get_audio_input_frames_async()
118
  video_frames = await self._streamlit_av_queue.get_video_frames_async()
119
 
120
  if len(audio_frames) > 0:
 
211
  await asyncio.sleep(0.01)
212
 
213
  # add observations to the environment state
214
+ count = len(self._audio_output_queue)
215
  is_talking = bool(count > 0)
216
  has_spoken_for_this_prompt = has_spoken_for_this_prompt or is_talking
217
  frame = self._animator.update(is_talking)
218
+ if self._video_output_queue.full():
219
+ evicted_item = await self._video_output_queue.get_async()
220
  del evicted_item
221
  frame_ref = ray.put(frame)
222
+ await self._video_output_queue.put_async(frame_ref)
223
 
224
  loops+=1
225
  self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames, loops: {loops}. loops per second: {loops/(time.time()-start_time):.2f}. Is speaking: {is_talking}({count}). {vector_debug}"
respond_to_prompt_actor.py CHANGED
@@ -144,14 +144,14 @@ class RespondToPromptActor:
144
  def __init__(
145
  self,
146
  environment_state_actor:EnvironmentStateActor,
147
- out_audio_queue):
148
  voice_id="2OviOUQc1JsQRQgNkVBj"
149
  self.prompt_queue = Queue(maxsize=100)
150
  self.llm_sentence_queue = Queue(maxsize=100)
151
  self.speech_chunk_queue = Queue(maxsize=100)
152
  self.environment_state_actor = environment_state_actor
153
 
154
- self.ffmpeg_converter_actor = FFMpegConverterActor.remote(out_audio_queue)
155
 
156
  self.prompt_to_llm = PromptToLLMActor.remote(
157
  self.environment_state_actor,
 
144
  def __init__(
145
  self,
146
  environment_state_actor:EnvironmentStateActor,
147
+ audio_output_queue):
148
  voice_id="2OviOUQc1JsQRQgNkVBj"
149
  self.prompt_queue = Queue(maxsize=100)
150
  self.llm_sentence_queue = Queue(maxsize=100)
151
  self.speech_chunk_queue = Queue(maxsize=100)
152
  self.environment_state_actor = environment_state_actor
153
 
154
+ self.ffmpeg_converter_actor = FFMpegConverterActor.remote(audio_output_queue)
155
 
156
  self.prompt_to_llm = PromptToLLMActor.remote(
157
  self.environment_state_actor,
streamlit_av_queue.py CHANGED
@@ -23,7 +23,7 @@ class StreamlitAVQueue:
23
  name="WebRtcAVQueueActor",
24
  get_if_exists=True,
25
  ).remote()
26
- self._out_video_frame = None
27
 
28
  def set_looking_listening(self, looking, listening: bool):
29
  with self._lock:
@@ -38,16 +38,16 @@ class StreamlitAVQueue:
38
  try:
39
  with self._lock:
40
  should_look = self._looking
41
- next_out_video_frame = await self.queue_actor.get_out_video_frame.remote()
42
- if next_out_video_frame is not None:
43
- self._out_video_frame = next_out_video_frame
44
  for i, frame in enumerate(frames):
45
  user_image = frame.to_ndarray(format="rgb24")
46
  if should_look:
47
  shared_tensor_ref = ray.put(user_image)
48
- await self.queue_actor.enqueue_in_video_frame.remote(shared_tensor_ref)
49
- if self._out_video_frame is not None:
50
- frame = self._out_video_frame
51
  # resize user image to 1/4 size
52
  user_frame = cv2.resize(user_image, (user_image.shape[1]//4, user_image.shape[0]//4), interpolation=cv2.INTER_AREA)
53
  # flip horizontally
@@ -85,7 +85,7 @@ class StreamlitAVQueue:
85
  sound_chunk += sound
86
  shared_buffer = np.array(sound_chunk.get_array_of_samples())
87
  shared_buffer_ref = ray.put(shared_buffer)
88
- await self.queue_actor.enqueue_in_audio_frame.remote(shared_buffer_ref)
89
  except Exception as e:
90
  print (e)
91
 
@@ -97,7 +97,7 @@ class StreamlitAVQueue:
97
  # print (f"frame: {frame.format.name}, {frame.layout.name}, {frame.sample_rate}, {frame.samples}")
98
  assert frame.format.bytes == 2
99
  assert frame.format.name == 's16'
100
- frame_as_bytes = await self.queue_actor.get_out_audio_frame.remote()
101
  if frame_as_bytes:
102
  # print(f"frame_as_bytes: {len(frame_as_bytes)}")
103
  assert len(frame_as_bytes) == frame.samples * frame.format.bytes
@@ -115,16 +115,16 @@ class StreamlitAVQueue:
115
  print (e)
116
  return new_frames
117
 
118
- async def get_in_audio_frames_async(self) -> List[av.AudioFrame]:
119
- shared_buffers = await self.queue_actor.get_in_audio_frames.remote()
120
  return shared_buffers
121
 
122
  async def get_video_frames_async(self) -> List[av.AudioFrame]:
123
- shared_tensors = await self.queue_actor.get_in_video_frames.remote()
124
  return shared_tensors
125
 
126
- def get_out_audio_queue(self)->Queue:
127
- return self.queue_actor.get_out_audio_queue.remote()
128
 
129
- def get_out_video_queue(self)->Queue:
130
- return self.queue_actor.get_out_video_queue.remote()
 
23
  name="WebRtcAVQueueActor",
24
  get_if_exists=True,
25
  ).remote()
26
+ self._video_output_frame = None
27
 
28
  def set_looking_listening(self, looking, listening: bool):
29
  with self._lock:
 
38
  try:
39
  with self._lock:
40
  should_look = self._looking
41
+ next_video_output_frame = await self.queue_actor.get_video_output_frame.remote()
42
+ if next_video_output_frame is not None:
43
+ self._video_output_frame = next_video_output_frame
44
  for i, frame in enumerate(frames):
45
  user_image = frame.to_ndarray(format="rgb24")
46
  if should_look:
47
  shared_tensor_ref = ray.put(user_image)
48
+ await self.queue_actor.enqueue_video_input_frame.remote(shared_tensor_ref)
49
+ if self._video_output_frame is not None:
50
+ frame = self._video_output_frame
51
  # resize user image to 1/4 size
52
  user_frame = cv2.resize(user_image, (user_image.shape[1]//4, user_image.shape[0]//4), interpolation=cv2.INTER_AREA)
53
  # flip horizontally
 
85
  sound_chunk += sound
86
  shared_buffer = np.array(sound_chunk.get_array_of_samples())
87
  shared_buffer_ref = ray.put(shared_buffer)
88
+ await self.queue_actor.enqueue_audio_input_frame.remote(shared_buffer_ref)
89
  except Exception as e:
90
  print (e)
91
 
 
97
  # print (f"frame: {frame.format.name}, {frame.layout.name}, {frame.sample_rate}, {frame.samples}")
98
  assert frame.format.bytes == 2
99
  assert frame.format.name == 's16'
100
+ frame_as_bytes = await self.queue_actor.get_audio_output_frame.remote()
101
  if frame_as_bytes:
102
  # print(f"frame_as_bytes: {len(frame_as_bytes)}")
103
  assert len(frame_as_bytes) == frame.samples * frame.format.bytes
 
115
  print (e)
116
  return new_frames
117
 
118
+ async def get_audio_input_frames_async(self) -> List[av.AudioFrame]:
119
+ shared_buffers = await self.queue_actor.get_audio_input_frames.remote()
120
  return shared_buffers
121
 
122
  async def get_video_frames_async(self) -> List[av.AudioFrame]:
123
+ shared_tensors = await self.queue_actor.get_video_input_frames.remote()
124
  return shared_tensors
125
 
126
+ def get_audio_output_queue(self)->Queue:
127
+ return self.queue_actor.get_audio_output_queue.remote()
128
 
129
+ def get_video_output_queue(self)->Queue:
130
+ return self.queue_actor.get_video_output_queue.remote()
webrtc_av_queue_actor.py CHANGED
@@ -8,58 +8,58 @@ import numpy as np
8
  @ray.remote
9
  class WebRtcAVQueueActor:
10
  def __init__(self):
11
- self.in_audio_queue = Queue(maxsize=3000) # Adjust the size as needed
12
- self.in_video_queue = Queue(maxsize=10) # Adjust the size as needed
13
- self.out_audio_queue = Queue(maxsize=3000) # Adjust the size as needed
14
- self.out_video_queue = Queue(maxsize=10) # Adjust the size as needed
15
 
16
 
17
- async def enqueue_in_video_frame(self, shared_tensor_ref):
18
- if self.in_video_queue.full():
19
- evicted_item = await self.in_video_queue.get_async()
20
  del evicted_item
21
- await self.in_video_queue.put_async(shared_tensor_ref)
22
 
23
- async def enqueue_in_audio_frame(self, shared_buffer_ref):
24
- if self.in_audio_queue.full():
25
- evicted_item = await self.in_audio_queue.get_async()
26
  del evicted_item
27
- await self.in_audio_queue.put_async(shared_buffer_ref)
28
 
29
- async def get_in_audio_frames(self):
30
  audio_frames = []
31
- if self.in_audio_queue.empty():
32
  return audio_frames
33
- while not self.in_audio_queue.empty():
34
- shared_tensor_ref = await self.in_audio_queue.get_async()
35
  audio_frames.append(shared_tensor_ref)
36
  return audio_frames
37
 
38
- async def get_in_video_frames(self):
39
  video_frames = []
40
- if self.in_video_queue.empty():
41
  return video_frames
42
- while not self.in_video_queue.empty():
43
- shared_tensor_ref = await self.in_video_queue.get_async()
44
  video_frames.append(shared_tensor_ref)
45
  return video_frames
46
 
47
- def get_out_audio_queue(self)->Queue:
48
- return self.out_audio_queue
49
 
50
- def get_out_video_queue(self)->Queue:
51
- return self.out_video_queue
52
 
53
- async def get_out_audio_frame(self):
54
- if self.out_audio_queue.empty():
55
  return None
56
- frame = await self.out_audio_queue.get_async()
57
  return frame
58
 
59
- async def get_out_video_frame(self):
60
- if self.out_video_queue.empty():
61
  return None
62
  frame = None
63
- while not self.out_video_queue.empty():
64
- frame = await self.out_video_queue.get_async()
65
  return frame
 
8
  @ray.remote
9
  class WebRtcAVQueueActor:
10
  def __init__(self):
11
+ self.audio_input_queue = Queue(maxsize=3000) # Adjust the size as needed
12
+ self.video_input_queue = Queue(maxsize=10) # Adjust the size as needed
13
+ self.audio_output_queue = Queue(maxsize=3000) # Adjust the size as needed
14
+ self.video_output_queue = Queue(maxsize=10) # Adjust the size as needed
15
 
16
 
17
+ async def enqueue_video_input_frame(self, shared_tensor_ref):
18
+ if self.video_input_queue.full():
19
+ evicted_item = await self.video_input_queue.get_async()
20
  del evicted_item
21
+ await self.video_input_queue.put_async(shared_tensor_ref)
22
 
23
+ async def enqueue_audio_input_frame(self, shared_buffer_ref):
24
+ if self.audio_input_queue.full():
25
+ evicted_item = await self.audio_input_queue.get_async()
26
  del evicted_item
27
+ await self.audio_input_queue.put_async(shared_buffer_ref)
28
 
29
+ async def get_audio_input_frames(self):
30
  audio_frames = []
31
+ if self.audio_input_queue.empty():
32
  return audio_frames
33
+ while not self.audio_input_queue.empty():
34
+ shared_tensor_ref = await self.audio_input_queue.get_async()
35
  audio_frames.append(shared_tensor_ref)
36
  return audio_frames
37
 
38
+ async def get_video_input_frames(self):
39
  video_frames = []
40
+ if self.video_input_queue.empty():
41
  return video_frames
42
+ while not self.video_input_queue.empty():
43
+ shared_tensor_ref = await self.video_input_queue.get_async()
44
  video_frames.append(shared_tensor_ref)
45
  return video_frames
46
 
47
+ def get_audio_output_queue(self)->Queue:
48
+ return self.audio_output_queue
49
 
50
+ def get_video_output_queue(self)->Queue:
51
+ return self.video_output_queue
52
 
53
+ async def get_audio_output_frame(self):
54
+ if self.audio_output_queue.empty():
55
  return None
56
+ frame = await self.audio_output_queue.get_async()
57
  return frame
58
 
59
+ async def get_video_output_frame(self):
60
+ if self.video_output_queue.empty():
61
  return None
62
  frame = None
63
+ while not self.video_output_queue.empty():
64
+ frame = await self.video_output_queue.get_async()
65
  return frame