import ray from ray.util.queue import Queue from ray.actor import ActorHandle import torch import numpy as np @ray.remote class WebRtcAVQueueActor: def __init__(self): self.in_audio_queue = Queue(maxsize=3000) # Adjust the size as needed self.in_video_queue = Queue(maxsize=10) # Adjust the size as needed self.out_audio_queue = Queue(maxsize=3000) # Adjust the size as needed self.out_video_queue = Queue(maxsize=10) # Adjust the size as needed async def enqueue_in_video_frame(self, shared_tensor_ref): if self.in_video_queue.full(): evicted_item = await self.in_video_queue.get_async() del evicted_item await self.in_video_queue.put_async(shared_tensor_ref) async def enqueue_in_audio_frame(self, shared_buffer_ref): if self.in_audio_queue.full(): evicted_item = await self.in_audio_queue.get_async() del evicted_item await self.in_audio_queue.put_async(shared_buffer_ref) async def get_in_audio_frames(self): audio_frames = [] if self.in_audio_queue.empty(): return audio_frames while not self.in_audio_queue.empty(): shared_tensor_ref = await self.in_audio_queue.get_async() audio_frames.append(shared_tensor_ref) return audio_frames async def get_in_video_frames(self): video_frames = [] if self.in_video_queue.empty(): return video_frames while not self.in_video_queue.empty(): shared_tensor_ref = await self.in_video_queue.get_async() video_frames.append(shared_tensor_ref) return video_frames def get_out_audio_queue(self)->Queue: return self.out_audio_queue def get_out_video_queue(self)->Queue: return self.out_video_queue async def get_out_audio_frame(self): if self.out_audio_queue.empty(): return None frame = await self.out_audio_queue.get_async() return frame async def get_out_video_frame(self): if self.out_video_queue.empty(): return None frame = None while not self.out_video_queue.empty(): frame = await self.out_video_queue.get_async() return frame