import ray from ray.util.queue import Queue from ray.actor import ActorHandle import torch import numpy as np @ray.remote class WebRtcAVQueueActor: def __init__(self): self.in_audio_queue = Queue(maxsize=100) # Adjust the size as needed self.in_video_queue = Queue(maxsize=100) # Adjust the size as needed self.out_audio_queue = Queue(maxsize=100) # Adjust the size as needed def enqueue_in_video_frame(self, shared_tensor_ref): if self.in_video_queue.full(): evicted_item = self.in_video_queue.get() del evicted_item self.in_video_queue.put(shared_tensor_ref) def enqueue_in_audio_frame(self, shared_buffer_ref): if self.in_audio_queue.full(): evicted_item = self.in_audio_queue.get() del evicted_item self.in_audio_queue.put(shared_buffer_ref) def get_in_audio_frames(self): audio_frames = [] if self.in_audio_queue.empty(): return audio_frames while not self.in_audio_queue.empty(): shared_tensor_ref = self.in_audio_queue.get() audio_frames.append(shared_tensor_ref) return audio_frames def get_in_video_frames(self): video_frames = [] if self.in_video_queue.empty(): return video_frames while not self.in_video_queue.empty(): shared_tensor_ref = self.in_video_queue.get() video_frames.append(shared_tensor_ref) return video_frames def get_out_audio_queue(self): return self.out_audio_queue async def get_out_audio_frame(self): if self.out_audio_queue.empty(): return None audio_frame = await self.out_audio_queue.get_async() return audio_frame