File size: 2,556 Bytes
ac35a95
 
 
 
 
 
 
 
72e4889
ac35a95
aec6f97
 
 
 
d91a673
72e4889
 
 
 
 
 
ac35a95
aec6f97
 
 
ac35a95
aec6f97
ac35a95
aec6f97
 
 
ac35a95
aec6f97
ac35a95
aec6f97
ac35a95
aec6f97
ac35a95
aec6f97
 
ac35a95
 
 
aec6f97
ac35a95
aec6f97
ac35a95
aec6f97
 
ac35a95
 
d91a673
aec6f97
 
d91a673
aec6f97
 
cf5e7f4
aec6f97
 
d91a673
aec6f97
cf5e7f4
 
aec6f97
 
cf5e7f4
795c382
aec6f97
 
cf5e7f4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import ray
from ray.util.queue import Queue
from ray.actor import ActorHandle
import torch
import numpy as np


@ray.remote
class AppInterfaceActor:
    def __init__(self):
        self.audio_input_queue = Queue(maxsize=3000)  # Adjust the size as needed
        self.video_input_queue = Queue(maxsize=10)  # Adjust the size as needed
        self.audio_output_queue = Queue(maxsize=3000)  # Adjust the size as needed
        self.video_output_queue = Queue(maxsize=10)  # Adjust the size as needed

    @staticmethod
    def get_singleton():
        return AppInterfaceActor.options(
            name="AppInterfaceActor",
            get_if_exists=True,
        ).remote()

    async def enqueue_video_input_frame(self, shared_tensor_ref):
        if self.video_input_queue.full():
            evicted_item = await self.video_input_queue.get_async()
            del evicted_item
        await self.video_input_queue.put_async(shared_tensor_ref)

    async def enqueue_audio_input_frame(self, shared_buffer_ref):
        if self.audio_input_queue.full():
            evicted_item = await self.audio_input_queue.get_async()
            del evicted_item
        await self.audio_input_queue.put_async(shared_buffer_ref)

    async def get_audio_input_frames(self):
        audio_frames = []
        if self.audio_input_queue.empty():
            return audio_frames
        while not self.audio_input_queue.empty():
            shared_tensor_ref = await self.audio_input_queue.get_async()
            audio_frames.append(shared_tensor_ref)
        return audio_frames

    async def get_video_input_frames(self):
        video_frames = []
        if self.video_input_queue.empty():
            return video_frames
        while not self.video_input_queue.empty():
            shared_tensor_ref = await self.video_input_queue.get_async()
            video_frames.append(shared_tensor_ref)
        return video_frames
    
    def get_audio_output_queue(self)->Queue:
        return self.audio_output_queue
    
    def get_video_output_queue(self)->Queue:
        return self.video_output_queue
    
    async def get_audio_output_frame(self):
        if self.audio_output_queue.empty():
            return None
        frame = await self.audio_output_queue.get_async()
        return frame

    async def get_video_output_frame(self):
        if self.video_output_queue.empty():
            return None
        frame = None
        while not self.video_output_queue.empty():
            frame = await self.video_output_queue.get_async()
        return frame