from typing import List import av import asyncio from collections import deque import threading import numpy as np class StreamlitAVQueue: def __init__(self): self.audio_frames_deque_lock = threading.Lock() self.audio_frames_deque: deque = deque([]) self.video_frames_deque_lock = threading.Lock() self.video_frames_deque: deque = deque([]) async def queued_video_frames_callback( self, frames: List[av.AudioFrame], ) -> av.AudioFrame: with self.video_frames_deque_lock: self.video_frames_deque.extend(frames) return frames async def queued_audio_frames_callback( self, frames: List[av.AudioFrame], ) -> av.AudioFrame: with self.audio_frames_deque_lock: self.audio_frames_deque.extend(frames) # return empty frames to avoid echo new_frames = [] for frame in frames: input_array = frame.to_ndarray() new_frame = av.AudioFrame.from_ndarray( np.zeros(input_array.shape, dtype=input_array.dtype), layout=frame.layout.name, ) new_frame.sample_rate = frame.sample_rate new_frames.append(new_frame) return new_frames def get_audio_frames(self) -> List[av.AudioFrame]: audio_frames = [] with self.audio_frames_deque_lock: audio_frames = list(self.audio_frames_deque) self.audio_frames_deque.clear() return audio_frames def get_video_frames(self) -> List[av.AudioFrame]: video_frames = [] with self.video_frames_deque_lock: video_frames = list(self.video_frames_deque) self.video_frames_deque.clear() return video_frames