import ray from ray.util.queue import Queue from ray.actor import ActorHandle import torch import numpy as np @ray.remote class WebRtcAVQueueActor: def __init__(self): self.audio_input_queue = Queue(maxsize=3000) # Adjust the size as needed self.video_input_queue = Queue(maxsize=10) # Adjust the size as needed self.audio_output_queue = Queue(maxsize=3000) # Adjust the size as needed self.video_output_queue = Queue(maxsize=10) # Adjust the size as needed async def enqueue_video_input_frame(self, shared_tensor_ref): if self.video_input_queue.full(): evicted_item = await self.video_input_queue.get_async() del evicted_item await self.video_input_queue.put_async(shared_tensor_ref) async def enqueue_audio_input_frame(self, shared_buffer_ref): if self.audio_input_queue.full(): evicted_item = await self.audio_input_queue.get_async() del evicted_item await self.audio_input_queue.put_async(shared_buffer_ref) async def get_audio_input_frames(self): audio_frames = [] if self.audio_input_queue.empty(): return audio_frames while not self.audio_input_queue.empty(): shared_tensor_ref = await self.audio_input_queue.get_async() audio_frames.append(shared_tensor_ref) return audio_frames async def get_video_input_frames(self): video_frames = [] if self.video_input_queue.empty(): return video_frames while not self.video_input_queue.empty(): shared_tensor_ref = await self.video_input_queue.get_async() video_frames.append(shared_tensor_ref) return video_frames def get_audio_output_queue(self)->Queue: return self.audio_output_queue def get_video_output_queue(self)->Queue: return self.video_output_queue async def get_audio_output_frame(self): if self.audio_output_queue.empty(): return None frame = await self.audio_output_queue.get_async() return frame async def get_video_output_frame(self): if self.video_output_queue.empty(): return None frame = None while not self.video_output_queue.empty(): frame = await self.video_output_queue.get_async() return frame