File size: 4,239 Bytes
162d5c8
 
 
 
 
 
 
62a21bd
ac35a95
62a21bd
 
162d5c8
 
62a21bd
d91a673
62a21bd
e2846c4
 
ac35a95
 
ad67495
 
e2846c4
 
 
 
ad67495
162d5c8
 
 
 
62a21bd
 
 
 
0d27fd9
62a21bd
 
162d5c8
 
 
 
 
 
62a21bd
e2846c4
 
62a21bd
e2846c4
62a21bd
 
 
 
 
 
 
 
 
 
 
 
0d27fd9
62a21bd
 
 
162d5c8
 
d91a673
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
162d5c8
768e92a
ac35a95
 
62a21bd
768e92a
 
ac35a95
d91a673
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from typing import List
import av
import asyncio
from collections import deque
import threading

import numpy as np
import ray
from webrtc_av_queue_actor import WebRtcAVQueueActor
import pydub
import torch

class StreamlitAVQueue:
    def __init__(self, audio_bit_rate=16000):
        self._output_channels = 2
        self._audio_bit_rate = audio_bit_rate
        self._listening = True
        self._lock = threading.Lock()
        self.queue_actor = WebRtcAVQueueActor.options(
            name="WebRtcAVQueueActor", 
            get_if_exists=True,
            ).remote() 
        
    def set_listening(self, listening: bool):
        with self._lock:
            self._listening = listening
            
    async def queued_video_frames_callback(
                self,
                frames: List[av.AudioFrame],
            ) -> av.AudioFrame:
        try:
            for frame in frames:
                shared_tensor = torch.from_numpy(frame.to_ndarray())
                shared_tensor_ref = ray.put(shared_tensor)
                await self.queue_actor.enqueue_in_video_frame.remote(shared_tensor_ref)
        except Exception as e:
            print (e)
        return frames

    async def queued_audio_frames_callback(
                self,
                frames: List[av.AudioFrame],
            ) -> av.AudioFrame:
        try:
            with self._lock:
                should_listed = self._listening
            sound_chunk = pydub.AudioSegment.empty()
            if len(frames) > 0 and should_listed:
                for frame in frames:
                    sound = pydub.AudioSegment(
                        data=frame.to_ndarray().tobytes(),
                        sample_width=frame.format.bytes,
                        frame_rate=frame.sample_rate,
                        channels=len(frame.layout.channels),
                    )
                    sound = sound.set_channels(1)
                    sound = sound.set_frame_rate(self._audio_bit_rate)
                    sound_chunk += sound
                shared_buffer = np.array(sound_chunk.get_array_of_samples())
                shared_buffer_ref = ray.put(shared_buffer)
                await self.queue_actor.enqueue_in_audio_frame.remote(shared_buffer_ref)
        except Exception as e:
            print (e)
            
        # return empty frames to avoid echo
        new_frames = []
        try:
            for frame in frames:
                required_samples = frame.samples
                # print (f"frame: {frame.format.name}, {frame.layout.name}, {frame.sample_rate}, {frame.samples}")
                assert frame.format.bytes == 2
                assert frame.format.name == 's16'
                frame_as_bytes = await self.queue_actor.get_out_audio_frame.remote()
                if frame_as_bytes:
                    # print(f"frame_as_bytes: {len(frame_as_bytes)}")
                    assert len(frame_as_bytes) == frame.samples * frame.format.bytes
                    samples = np.frombuffer(frame_as_bytes, dtype=np.int16)
                else:
                    samples = np.zeros((required_samples * 2 * 1), dtype=np.int16)
                if self._output_channels == 2:
                    samples = np.vstack((samples, samples)).reshape((-1,), order='F')
                samples = samples.reshape(1, -1)
                layout = 'stereo' if self._output_channels == 2 else 'mono'
                new_frame = av.AudioFrame.from_ndarray(samples, format='s16', layout=layout)
                new_frame.sample_rate = frame.sample_rate
                new_frames.append(new_frame)
        except Exception as e:
            print (e)
        return new_frames

    async def get_in_audio_frames_async(self) -> List[av.AudioFrame]:
        shared_buffers = await self.queue_actor.get_in_audio_frames.remote()
        return shared_buffers

    async def get_video_frames_async(self) -> List[av.AudioFrame]:
        shared_tensors = await self.queue_actor.get_in_video_frames.remote()
        return shared_tensors
    
    def get_out_audio_queue(self):
        return self.queue_actor.get_out_audio_queue.remote()
    
    # def get_out_audio_frame(self):
    #     return self.queue_actor.get_out_audio_frame.remote()