project_charles / streamlit_av_queue.py
sohojoe's picture
add missing async / await
0d27fd9
raw
history blame
3.96 kB
from typing import List
import av
import asyncio
from collections import deque
import threading
import numpy as np
import ray
from webrtc_av_queue_actor import WebRtcAVQueueActor
import pydub
import torch
class StreamlitAVQueue:
def __init__(self, audio_bit_rate=16000):
self._output_channels = 2
self._audio_bit_rate = audio_bit_rate
self.queue_actor = WebRtcAVQueueActor.options(
name="WebRtcAVQueueActor",
get_if_exists=True,
).remote()
async def queued_video_frames_callback(
self,
frames: List[av.AudioFrame],
) -> av.AudioFrame:
try:
for frame in frames:
shared_tensor = torch.from_numpy(frame.to_ndarray())
shared_tensor_ref = ray.put(shared_tensor)
await self.queue_actor.enqueue_in_video_frame.remote(shared_tensor_ref)
except Exception as e:
print (e)
return frames
async def queued_audio_frames_callback(
self,
frames: List[av.AudioFrame],
) -> av.AudioFrame:
try:
sound_chunk = pydub.AudioSegment.empty()
if len(frames) > 0:
for frame in frames:
sound = pydub.AudioSegment(
data=frame.to_ndarray().tobytes(),
sample_width=frame.format.bytes,
frame_rate=frame.sample_rate,
channels=len(frame.layout.channels),
)
sound = sound.set_channels(1)
sound = sound.set_frame_rate(self._audio_bit_rate)
sound_chunk += sound
shared_buffer = np.array(sound_chunk.get_array_of_samples())
shared_buffer_ref = ray.put(shared_buffer)
await self.queue_actor.enqueue_in_audio_frame.remote(shared_buffer_ref)
except Exception as e:
print (e)
# return empty frames to avoid echo
new_frames = []
try:
for frame in frames:
required_samples = frame.samples
# print (f"frame: {frame.format.name}, {frame.layout.name}, {frame.sample_rate}, {frame.samples}")
assert frame.format.bytes == 2
assert frame.format.name == 's16'
frame_as_bytes = await self.queue_actor.get_out_audio_frame.remote()
if frame_as_bytes:
# print(f"frame_as_bytes: {len(frame_as_bytes)}")
assert len(frame_as_bytes) == frame.samples * frame.format.bytes
samples = np.frombuffer(frame_as_bytes, dtype=np.int16)
else:
samples = np.zeros((required_samples * 2 * 1), dtype=np.int16)
if self._output_channels == 2:
samples = np.vstack((samples, samples)).reshape((-1,), order='F')
samples = samples.reshape(1, -1)
layout = 'stereo' if self._output_channels == 2 else 'mono'
new_frame = av.AudioFrame.from_ndarray(samples, format='s16', layout=layout)
new_frame.sample_rate = frame.sample_rate
new_frames.append(new_frame)
except Exception as e:
print (e)
return new_frames
async def get_in_audio_frames_async(self) -> List[av.AudioFrame]:
shared_buffers = await self.queue_actor.get_in_audio_frames.remote()
return shared_buffers
async def get_video_frames_async(self) -> List[av.AudioFrame]:
shared_tensors = await self.queue_actor.get_in_video_frames.remote()
return shared_tensors
def get_out_audio_queue(self):
return self.queue_actor.get_out_audio_queue.remote()
# def get_out_audio_frame(self):
# return self.queue_actor.get_out_audio_frame.remote()