Spaces:
Sleeping
Sleeping
File size: 2,720 Bytes
d0639dc c4fe843 d0639dc 6dfcd3a d0639dc 681abd4 c4fe843 d0639dc 6dfcd3a 681abd4 d0639dc 681abd4 6dfcd3a d0639dc 681abd4 d0639dc 6dfcd3a d0639dc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
import logging
import traceback
from typing import List
import av
import numpy as np
import streamlit as st
from streamlit_webrtc import WebRtcMode, webrtc_streamer
import pydub
from dotenv import load_dotenv
from ffmpeg_converter_actor import FFMpegConverterActor
load_dotenv()
from sample_utils.turn import get_ice_servers
logger = logging.getLogger(__name__)
import os
import ray
from ray.util.queue import Queue
if not ray.is_initialized():
# Try to connect to a running Ray cluster
ray_address = os.getenv('RAY_ADDRESS')
if ray_address:
ray.init(ray_address, namespace="project_charles")
else:
ray.init(namespace="project_charles")
def video_frame_callback(
frame: av.VideoFrame,
) -> av.VideoFrame:
return frame
with open("chunks.pkl", "rb") as f:
import pickle
debug_chunks = pickle.load(f)
converter_queue = Queue(maxsize=100)
converter_actor = FFMpegConverterActor.remote(converter_queue)
ray.get(converter_actor.start_process.remote())
converter_actor.run.remote()
for chunk in debug_chunks:
ray.get(converter_actor.push_chunk.remote(chunk))
# emptry array of type int16
sample_buffer = np.zeros((0), dtype=np.int16)
def process_frame(old_frame):
try:
output_channels = 2
output_sample_rate = 44100
required_samples = old_frame.samples
if not converter_queue.empty():
frame_as_bytes = converter_queue.get()
samples = np.frombuffer(frame_as_bytes, dtype=np.int16)
else:
# create a byte array of zeros
samples = np.zeros((required_samples * 2 * 1), dtype=np.int16)
# Duplicate mono channel for stereo
if output_channels == 2:
samples = np.vstack((samples, samples)).reshape((-1,), order='F')
samples = samples.reshape(1, -1)
layout = 'stereo' if output_channels == 2 else 'mono'
new_frame = av.AudioFrame.from_ndarray(samples, format='s16', layout=layout)
new_frame.sample_rate = old_frame.sample_rate
new_frame.pts = old_frame.pts
return new_frame
except Exception as e:
print (e)
traceback.print_exc()
raise(e)
def audio_frame_callback(old_frame: av.AudioFrame) -> av.AudioFrame:
new_frame = process_frame(old_frame)
# print (f"frame: {old_frame}, pts: {old_frame.pts}")
# print (f"new_frame: {new_frame}, pts: {new_frame.pts}")
return new_frame
# return old_frame
webrtc_streamer(
key="delay",
mode=WebRtcMode.SENDRECV,
rtc_configuration={"iceServers": get_ice_servers()},
video_frame_callback=video_frame_callback,
audio_frame_callback=audio_frame_callback,
)
|