sohojoe commited on
Commit
62a21bd
1 Parent(s): ff684ee

start of moving to ray

Browse files
Files changed (3) hide show
  1. d_app.py +9 -13
  2. input_av_queue_actor.py +37 -0
  3. streamlit_av_queue.py +40 -20
d_app.py CHANGED
@@ -24,6 +24,12 @@ load_dotenv()
24
 
25
  webrtc_ctx = None
26
 
 
 
 
 
 
 
27
 
28
  async def main():
29
 
@@ -98,21 +104,11 @@ async def main():
98
  if len(st.session_state.debug_queue) > 0:
99
  prompt = st.session_state.debug_queue.pop(0)
100
  await st.session_state.chat_pipeline.enqueue(prompt)
101
- sound_chunk = pydub.AudioSegment.empty()
102
  audio_frames = st.session_state.streamlit_av_queue.get_audio_frames()
103
  if len(audio_frames) > 0:
104
- for audio_frame in audio_frames:
105
- sound = pydub.AudioSegment(
106
- data=audio_frame.to_ndarray().tobytes(),
107
- sample_width=audio_frame.format.bytes,
108
- frame_rate=audio_frame.sample_rate,
109
- channels=len(audio_frame.layout.channels),
110
- )
111
- sound = sound.set_channels(1)
112
- sound = sound.set_frame_rate(st.session_state.speech_to_text_vosk.get_audio_bit_rate())
113
- sound_chunk += sound
114
- buffer = np.array(sound_chunk.get_array_of_samples())
115
- st.session_state.speech_to_text_vosk.add_speech_bytes(buffer.tobytes())
116
  prompt, speaker_finished = st.session_state.speech_to_text_vosk.get_text()
117
  if speaker_finished and len(prompt) > 0:
118
  print(f"Prompt: {prompt}")
 
24
 
25
  webrtc_ctx = None
26
 
27
+ # Initialize Ray
28
+ import ray
29
+ if not ray.is_initialized():
30
+ ray.init()
31
+
32
+
33
 
34
  async def main():
35
 
 
104
  if len(st.session_state.debug_queue) > 0:
105
  prompt = st.session_state.debug_queue.pop(0)
106
  await st.session_state.chat_pipeline.enqueue(prompt)
 
107
  audio_frames = st.session_state.streamlit_av_queue.get_audio_frames()
108
  if len(audio_frames) > 0:
109
+ # Concatenate all audio frames into a single buffer
110
+ audio_buffer = b"".join([buffer.tobytes() for buffer in audio_frames])
111
+ st.session_state.speech_to_text_vosk.add_speech_bytes(audio_buffer)
 
 
 
 
 
 
 
 
 
112
  prompt, speaker_finished = st.session_state.speech_to_text_vosk.get_text()
113
  if speaker_finished and len(prompt) > 0:
114
  print(f"Prompt: {prompt}")
input_av_queue_actor.py ADDED
@@ -0,0 +1,37 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import ray
2
+ from ray.util.queue import Queue
3
+ from ray.actor import ActorHandle
4
+ import torch
5
+ import numpy as np
6
+
7
+
8
+ @ray.remote
9
+ class InputAVQueueActor:
10
+ def __init__(self):
11
+ self.audio_queue = Queue(maxsize=100) # Adjust the size as needed
12
+ self.video_queue = Queue(maxsize=100) # Adjust the size as needed
13
+
14
+ def enqueue_video_frame(self, shared_tensor_ref):
15
+ self.video_queue.put(shared_tensor_ref)
16
+
17
+ def enqueue_audio_frame(self, shared_buffer_ref):
18
+ self.audio_queue.put(shared_buffer_ref)
19
+
20
+
21
+ def get_audio_frames(self):
22
+ audio_frames = []
23
+ if self.audio_queue.empty():
24
+ return audio_frames
25
+ while not self.audio_queue.empty():
26
+ shared_tensor_ref = self.audio_queue.get()
27
+ audio_frames.append(shared_tensor_ref)
28
+ return audio_frames
29
+
30
+ def get_video_frames(self):
31
+ video_frames = []
32
+ if self.video_queue.empty():
33
+ return video_frames
34
+ while not self.video_queue.empty():
35
+ shared_tensor_ref = self.video_queue.get()
36
+ video_frames.append(shared_tensor_ref)
37
+ return video_frames
streamlit_av_queue.py CHANGED
@@ -5,29 +5,55 @@ from collections import deque
5
  import threading
6
 
7
  import numpy as np
 
 
 
 
8
 
9
  class StreamlitAVQueue:
10
- def __init__(self):
11
- self.audio_frames_deque_lock = threading.Lock()
12
- self.audio_frames_deque: deque = deque([])
13
-
14
- self.video_frames_deque_lock = threading.Lock()
15
- self.video_frames_deque: deque = deque([])
16
 
17
  async def queued_video_frames_callback(
18
  self,
19
  frames: List[av.AudioFrame],
20
  ) -> av.AudioFrame:
21
- with self.video_frames_deque_lock:
22
- self.video_frames_deque.extend(frames)
 
 
 
 
 
23
  return frames
24
 
25
  async def queued_audio_frames_callback(
26
  self,
27
  frames: List[av.AudioFrame],
28
  ) -> av.AudioFrame:
29
- with self.audio_frames_deque_lock:
30
- self.audio_frames_deque.extend(frames)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31
  # return empty frames to avoid echo
32
  new_frames = []
33
  for frame in frames:
@@ -41,15 +67,9 @@ class StreamlitAVQueue:
41
  return new_frames
42
 
43
  def get_audio_frames(self) -> List[av.AudioFrame]:
44
- audio_frames = []
45
- with self.audio_frames_deque_lock:
46
- audio_frames = list(self.audio_frames_deque)
47
- self.audio_frames_deque.clear()
48
- return audio_frames
49
 
50
  def get_video_frames(self) -> List[av.AudioFrame]:
51
- video_frames = []
52
- with self.video_frames_deque_lock:
53
- video_frames = list(self.video_frames_deque)
54
- self.video_frames_deque.clear()
55
- return video_frames
 
5
  import threading
6
 
7
  import numpy as np
8
+ import ray
9
+ from input_av_queue_actor import InputAVQueueActor
10
+ import pydub
11
+ import torch
12
 
13
  class StreamlitAVQueue:
14
+ def __init__(self, audio_bit_rate=16000):
15
+ self._audio_bit_rate = audio_bit_rate
16
+ try:
17
+ self.queue_actor = ray.get_actor("InputAVQueueActor")
18
+ except ValueError as e:
19
+ self.queue_actor = InputAVQueueActor.options(name="InputAVQueueActor").remote()
20
 
21
  async def queued_video_frames_callback(
22
  self,
23
  frames: List[av.AudioFrame],
24
  ) -> av.AudioFrame:
25
+ try:
26
+ for frame in frames:
27
+ shared_tensor = torch.from_numpy(frame.to_ndarray())
28
+ shared_tensor_ref = ray.put(shared_tensor)
29
+ self.queue_actor.enqueue_video_frame.remote(shared_tensor_ref)
30
+ except Exception as e:
31
+ print (e)
32
  return frames
33
 
34
  async def queued_audio_frames_callback(
35
  self,
36
  frames: List[av.AudioFrame],
37
  ) -> av.AudioFrame:
38
+ try:
39
+ sound_chunk = pydub.AudioSegment.empty()
40
+ if len(frames) > 0:
41
+ for frame in frames:
42
+ sound = pydub.AudioSegment(
43
+ data=frame.to_ndarray().tobytes(),
44
+ sample_width=frame.format.bytes,
45
+ frame_rate=frame.sample_rate,
46
+ channels=len(frame.layout.channels),
47
+ )
48
+ sound = sound.set_channels(1)
49
+ sound = sound.set_frame_rate(self._audio_bit_rate)
50
+ sound_chunk += sound
51
+ shared_buffer = np.array(sound_chunk.get_array_of_samples())
52
+ shared_buffer_ref = ray.put(shared_buffer)
53
+ self.queue_actor.enqueue_audio_frame.remote(shared_buffer_ref)
54
+ except Exception as e:
55
+ print (e)
56
+
57
  # return empty frames to avoid echo
58
  new_frames = []
59
  for frame in frames:
 
67
  return new_frames
68
 
69
  def get_audio_frames(self) -> List[av.AudioFrame]:
70
+ shared_buffers = ray.get(self.queue_actor.get_audio_frames.remote())
71
+ return shared_buffers
 
 
 
72
 
73
  def get_video_frames(self) -> List[av.AudioFrame]:
74
+ shared_tensors = ray.get(self.queue_actor.get_video_frames.remote())
75
+ return shared_tensors