project_charles / streamlit_av_queue.py
sohojoe's picture
refactor: rename webrtc... to app_interface...
72e4889
raw
history blame
5.74 kB
from typing import List
import av
import asyncio
from collections import deque
import threading
import cv2
import numpy as np
import ray
from ray.util.queue import Queue
from app_interface_actor import AppInterfaceActor
import pydub
import torch
class StreamlitAVQueue:
def __init__(self, audio_bit_rate=16000):
self._output_channels = 2
self._audio_bit_rate = audio_bit_rate
self._listening = True
self._looking = False
self._lock = threading.Lock()
self.app_interface_actor = AppInterfaceActor.get_singleton()
self._video_output_frame = None
def set_looking_listening(self, looking, listening: bool):
with self._lock:
self._looking = looking
self._listening = listening
async def queued_video_frames_callback(
self,
frames: List[av.VideoFrame],
) -> av.VideoFrame:
updated_frames = []
try:
with self._lock:
should_look = self._looking
next_video_output_frame = await self.app_interface_actor.get_video_output_frame.remote()
if next_video_output_frame is not None:
self._video_output_frame = next_video_output_frame
for i, frame in enumerate(frames):
user_image = frame.to_ndarray(format="rgb24")
if should_look:
shared_tensor_ref = ray.put(user_image)
await self.app_interface_actor.enqueue_video_input_frame.remote(shared_tensor_ref)
if self._video_output_frame is not None:
frame = self._video_output_frame
# resize user image to 1/4 size
user_frame = cv2.resize(user_image, (user_image.shape[1]//4, user_image.shape[0]//4), interpolation=cv2.INTER_AREA)
# flip horizontally
user_frame = cv2.flip(user_frame, 1)
x_user = 0
y_user = frame.shape[0] - user_frame.shape[0]
final_frame = frame.copy()
final_frame[y_user:y_user+user_frame.shape[0], x_user:x_user+user_frame.shape[1]] = user_frame
frame = av.VideoFrame.from_ndarray(final_frame, format="rgb24")
updated_frames.append(frame)
# print (f"tesnor len: {len(shared_tensor)}, tensor shape: {shared_tensor.shape}, tensor type:{shared_tensor.dtype} tensor ref: {shared_tensor_ref}")
except Exception as e:
print (e)
return updated_frames
async def queued_audio_frames_callback(
self,
frames: List[av.AudioFrame],
) -> av.AudioFrame:
try:
with self._lock:
should_listed = self._listening
sound_chunk = pydub.AudioSegment.empty()
if len(frames) > 0 and should_listed:
for frame in frames:
sound = pydub.AudioSegment(
data=frame.to_ndarray().tobytes(),
sample_width=frame.format.bytes,
frame_rate=frame.sample_rate,
channels=len(frame.layout.channels),
)
sound = sound.set_channels(1)
sound = sound.set_frame_rate(self._audio_bit_rate)
sound_chunk += sound
shared_buffer = np.array(sound_chunk.get_array_of_samples())
shared_buffer_ref = ray.put(shared_buffer)
await self.app_interface_actor.enqueue_audio_input_frame.remote(shared_buffer_ref)
except Exception as e:
print (e)
# return empty frames to avoid echo
new_frames = []
try:
for frame in frames:
required_samples = frame.samples
# print (f"frame: {frame.format.name}, {frame.layout.name}, {frame.sample_rate}, {frame.samples}")
assert frame.format.bytes == 2
assert frame.format.name == 's16'
frame_as_bytes = await self.app_interface_actor.get_audio_output_frame.remote()
if frame_as_bytes:
# print(f"frame_as_bytes: {len(frame_as_bytes)}")
assert len(frame_as_bytes) == frame.samples * frame.format.bytes
samples = np.frombuffer(frame_as_bytes, dtype=np.int16)
else:
samples = np.zeros((required_samples * 2 * 1), dtype=np.int16)
if self._output_channels == 2:
samples = np.vstack((samples, samples)).reshape((-1,), order='F')
samples = samples.reshape(1, -1)
layout = 'stereo' if self._output_channels == 2 else 'mono'
new_frame = av.AudioFrame.from_ndarray(samples, format='s16', layout=layout)
new_frame.sample_rate = frame.sample_rate
new_frames.append(new_frame)
except Exception as e:
print (e)
return new_frames
async def get_audio_input_frames_async(self) -> List[av.AudioFrame]:
shared_buffers = await self.app_interface_actor.get_audio_input_frames.remote()
return shared_buffers
async def get_video_frames_async(self) -> List[av.AudioFrame]:
shared_tensors = await self.app_interface_actor.get_video_input_frames.remote()
return shared_tensors
def get_audio_output_queue(self)->Queue:
return self.app_interface_actor.get_audio_output_queue.remote()
def get_video_output_queue(self)->Queue:
return self.app_interface_actor.get_video_output_queue.remote()