File size: 2,782 Bytes
d0639dc
 
 
 
 
 
 
 
 
 
 
c4fe843
d0639dc
 
 
 
 
 
6dfcd3a
d0639dc
681abd4
 
 
 
 
 
 
 
 
 
c4fe843
d0639dc
 
 
 
 
 
 
 
 
 
 
6dfcd3a
681abd4
 
 
 
d0639dc
681abd4
6dfcd3a
d0639dc
 
 
 
 
 
 
 
 
 
 
681abd4
 
d91a673
681abd4
 
 
 
d0639dc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6dfcd3a
 
d0639dc
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import logging
import traceback
from typing import List

import av
import numpy as np
import streamlit as st
from streamlit_webrtc import WebRtcMode, webrtc_streamer
import pydub

from dotenv import load_dotenv
from ffmpeg_converter_actor import FFMpegConverterActor
load_dotenv()
from sample_utils.turn import get_ice_servers

logger = logging.getLogger(__name__)


import os

import ray
from ray.util.queue import Queue
if not ray.is_initialized():
    # Try to connect to a running Ray cluster
    ray_address = os.getenv('RAY_ADDRESS')
    if ray_address:
        ray.init(ray_address, namespace="project_charles")
    else:
        ray.init(namespace="project_charles")




def video_frame_callback(
    frame: av.VideoFrame,
) -> av.VideoFrame:
    return frame


with open("chunks.pkl", "rb") as f:
    import pickle
    debug_chunks = pickle.load(f)

    converter_queue = Queue(maxsize=100)
    converter_actor = FFMpegConverterActor.remote(converter_queue)
    ray.get(converter_actor.start_process.remote())
    converter_actor.run.remote()
    for chunk in debug_chunks:
        ray.get(converter_actor.push_chunk.remote(chunk))


# emptry array of type int16
sample_buffer = np.zeros((0), dtype=np.int16)

def process_frame(old_frame):

    try:
        output_channels = 2
        output_sample_rate = 44100
        required_samples = old_frame.samples

        if not converter_queue.empty():
            frame_as_bytes = converter_queue.get()
            # print(f"frame_as_bytes: {len(frame_as_bytes)}")
            samples = np.frombuffer(frame_as_bytes, dtype=np.int16)
        else: 
            # create a byte array of zeros
            samples = np.zeros((required_samples * 2 * 1), dtype=np.int16)

        # Duplicate mono channel for stereo
        if output_channels == 2:
            samples = np.vstack((samples, samples)).reshape((-1,), order='F')

        samples = samples.reshape(1, -1)

        layout = 'stereo' if output_channels == 2 else 'mono'
        new_frame = av.AudioFrame.from_ndarray(samples, format='s16', layout=layout)
        new_frame.sample_rate = old_frame.sample_rate
        new_frame.pts = old_frame.pts
        return new_frame
    except Exception as e:
        print (e)
        traceback.print_exc() 
        raise(e)



def audio_frame_callback(old_frame: av.AudioFrame) -> av.AudioFrame:
    new_frame = process_frame(old_frame)

    # print (f"frame:     {old_frame}, pts: {old_frame.pts}")
    # print (f"new_frame: {new_frame}, pts: {new_frame.pts}")

    return new_frame
    # return old_frame


webrtc_streamer(
    key="delay",
    mode=WebRtcMode.SENDRECV,
    rtc_configuration={"iceServers": get_ice_servers()},

    video_frame_callback=video_frame_callback,
    audio_frame_callback=audio_frame_callback,

)