import logging import traceback from typing import List import av import numpy as np import streamlit as st from streamlit_webrtc import WebRtcMode, webrtc_streamer import pydub from dotenv import load_dotenv from ffmpeg_converter_actor import FFMpegConverterActor load_dotenv() from sample_utils.turn import get_ice_servers logger = logging.getLogger(__name__) import os import ray from ray.util.queue import Queue if not ray.is_initialized(): # Try to connect to a running Ray cluster ray_address = os.getenv('RAY_ADDRESS') if ray_address: ray.init(ray_address, namespace="project_charles") else: ray.init(namespace="project_charles") def video_frame_callback( frame: av.VideoFrame, ) -> av.VideoFrame: return frame with open("chunks.pkl", "rb") as f: import pickle debug_chunks = pickle.load(f) converter_queue = Queue(maxsize=100) converter_actor = FFMpegConverterActor.remote(converter_queue) ray.get(converter_actor.start_process.remote()) converter_actor.run.remote() for chunk in debug_chunks: ray.get(converter_actor.push_chunk.remote(chunk)) # emptry array of type int16 sample_buffer = np.zeros((0), dtype=np.int16) def process_frame(old_frame): try: output_channels = 2 output_sample_rate = 44100 required_samples = old_frame.samples if not converter_queue.empty(): frame_as_bytes = converter_queue.get() # print(f"frame_as_bytes: {len(frame_as_bytes)}") samples = np.frombuffer(frame_as_bytes, dtype=np.int16) else: # create a byte array of zeros samples = np.zeros((required_samples * 2 * 1), dtype=np.int16) # Duplicate mono channel for stereo if output_channels == 2: samples = np.vstack((samples, samples)).reshape((-1,), order='F') samples = samples.reshape(1, -1) layout = 'stereo' if output_channels == 2 else 'mono' new_frame = av.AudioFrame.from_ndarray(samples, format='s16', layout=layout) new_frame.sample_rate = old_frame.sample_rate new_frame.pts = old_frame.pts return new_frame except Exception as e: print (e) traceback.print_exc() raise(e) def audio_frame_callback(old_frame: av.AudioFrame) -> av.AudioFrame: new_frame = process_frame(old_frame) # print (f"frame: {old_frame}, pts: {old_frame.pts}") # print (f"new_frame: {new_frame}, pts: {new_frame.pts}") return new_frame # return old_frame webrtc_streamer( key="delay", mode=WebRtcMode.SENDRECV, rtc_configuration={"iceServers": get_ice_servers()}, video_frame_callback=video_frame_callback, audio_frame_callback=audio_frame_callback, )