project_charles / webrtc_av_queue_actor.py
sohojoe's picture
refactor: in/out_audio/video to audio/video_input/output
aec6f97
raw
history blame
2.38 kB
import ray
from ray.util.queue import Queue
from ray.actor import ActorHandle
import torch
import numpy as np
@ray.remote
class WebRtcAVQueueActor:
def __init__(self):
self.audio_input_queue = Queue(maxsize=3000) # Adjust the size as needed
self.video_input_queue = Queue(maxsize=10) # Adjust the size as needed
self.audio_output_queue = Queue(maxsize=3000) # Adjust the size as needed
self.video_output_queue = Queue(maxsize=10) # Adjust the size as needed
async def enqueue_video_input_frame(self, shared_tensor_ref):
if self.video_input_queue.full():
evicted_item = await self.video_input_queue.get_async()
del evicted_item
await self.video_input_queue.put_async(shared_tensor_ref)
async def enqueue_audio_input_frame(self, shared_buffer_ref):
if self.audio_input_queue.full():
evicted_item = await self.audio_input_queue.get_async()
del evicted_item
await self.audio_input_queue.put_async(shared_buffer_ref)
async def get_audio_input_frames(self):
audio_frames = []
if self.audio_input_queue.empty():
return audio_frames
while not self.audio_input_queue.empty():
shared_tensor_ref = await self.audio_input_queue.get_async()
audio_frames.append(shared_tensor_ref)
return audio_frames
async def get_video_input_frames(self):
video_frames = []
if self.video_input_queue.empty():
return video_frames
while not self.video_input_queue.empty():
shared_tensor_ref = await self.video_input_queue.get_async()
video_frames.append(shared_tensor_ref)
return video_frames
def get_audio_output_queue(self)->Queue:
return self.audio_output_queue
def get_video_output_queue(self)->Queue:
return self.video_output_queue
async def get_audio_output_frame(self):
if self.audio_output_queue.empty():
return None
frame = await self.audio_output_queue.get_async()
return frame
async def get_video_output_frame(self):
if self.video_output_queue.empty():
return None
frame = None
while not self.video_output_queue.empty():
frame = await self.video_output_queue.get_async()
return frame