File size: 1,878 Bytes
ac35a95
 
 
 
 
 
 
 
 
 
 
 
d91a673
 
ac35a95
0d27fd9
ac35a95
0d27fd9
ac35a95
0d27fd9
ac35a95
0d27fd9
ac35a95
0d27fd9
ac35a95
0d27fd9
ac35a95
 
0d27fd9
ac35a95
 
 
 
0d27fd9
ac35a95
 
 
0d27fd9
ac35a95
 
 
 
0d27fd9
ac35a95
 
d91a673
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import ray
from ray.util.queue import Queue
from ray.actor import ActorHandle
import torch
import numpy as np


@ray.remote
class WebRtcAVQueueActor:
    def __init__(self):
        self.in_audio_queue = Queue(maxsize=100)  # Adjust the size as needed
        self.in_video_queue = Queue(maxsize=100)  # Adjust the size as needed
        self.out_audio_queue = Queue(maxsize=100)  # Adjust the size as needed


    async def enqueue_in_video_frame(self, shared_tensor_ref):
        if self.in_video_queue.full():
            evicted_item = await self.in_video_queue.get_async()
            del evicted_item
        await self.in_video_queue.put_async(shared_tensor_ref)

    async def enqueue_in_audio_frame(self, shared_buffer_ref):
        if self.in_audio_queue.full():
            evicted_item = await self.in_audio_queue.get_async()
            del evicted_item
        await self.in_audio_queue.put_async(shared_buffer_ref)


    async def get_in_audio_frames(self):
        audio_frames = []
        if self.in_audio_queue.empty():
            return audio_frames
        while not self.in_audio_queue.empty():
            shared_tensor_ref = await self.in_audio_queue.get_async()
            audio_frames.append(shared_tensor_ref)
        return audio_frames

    async def get_in_video_frames(self):
        video_frames = []
        if self.in_video_queue.empty():
            return video_frames
        while not self.in_video_queue.empty():
            shared_tensor_ref = await self.in_video_queue.get_async()
            video_frames.append(shared_tensor_ref)
        return video_frames
    
    def get_out_audio_queue(self):
        return self.out_audio_queue
    
    async def get_out_audio_frame(self):
        if self.out_audio_queue.empty():
            return None
        audio_frame = await self.out_audio_queue.get_async()
        return audio_frame