import asyncio import io import logging import select import time import traceback from typing import List import av import numpy as np import streamlit as st from streamlit_webrtc import WebRtcMode, webrtc_streamer import pydub from dotenv import load_dotenv load_dotenv() from sample_utils.turn import get_ice_servers logger = logging.getLogger(__name__) import subprocess import os import ray from ray.util.queue import Queue if not ray.is_initialized(): # Try to connect to a running Ray cluster ray_address = os.getenv('RAY_ADDRESS') if ray_address: ray.init(ray_address, namespace="project_charles") else: ray.init(namespace="project_charles") @ray.remote class FFMpegConverterActor: def __init__(self, queue: Queue, buffer_size: int = 1920, output_format: str='s16le'): self.queue = queue self.buffer_size = 1920 self.output_format = output_format self.input_pipe = None self.output_pipe = None self.process = None async def run(self): while True: chunk = await self.output_pipe.read(self.buffer_size) await self.queue.put_async(chunk) async def start_process(self): cmd = [ 'ffmpeg', '-i', 'pipe:0', # read from stdin '-f', self.output_format, '-ar', '48000', '-ac', '1', 'pipe:1' # write to stdout ] self.process = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) self.input_pipe = self.process.stdin self.output_pipe = self.process.stdout assert self.input_pipe is not None, "input_pipe was not initialized" print (f"input_pipe: {self.input_pipe}") async def push_chunk(self, chunk): try: self.input_pipe.write(chunk) # await self.input_pipe.drain() except BrokenPipeError: # If the pipe is broken, restart the process. await self.start_process() self.input_pipe.write(chunk) # await self.input_pipe.drain() # def has_processed_all_data(self): # return self.process.poll() is not None def close(self): self.input_pipe.close() self.output_pipe.close() self.process.wait() def video_frame_callback( frame: av.VideoFrame, ) -> av.VideoFrame: return frame with open("chunks.pkl", "rb") as f: import pickle debug_chunks = pickle.load(f) converter_queue = Queue(maxsize=100) converter_actor = FFMpegConverterActor.remote(converter_queue) ray.get(converter_actor.start_process.remote()) converter_actor.run.remote() for chunk in debug_chunks: ray.get(converter_actor.push_chunk.remote(chunk)) # emptry array of type int16 sample_buffer = np.zeros((0), dtype=np.int16) def process_frame(old_frame): try: output_channels = 2 output_sample_rate = 44100 required_samples = old_frame.samples if not converter_queue.empty(): frame_as_bytes = converter_queue.get() samples = np.frombuffer(frame_as_bytes, dtype=np.int16) else: # create a byte array of zeros samples = np.zeros((required_samples * 2 * 1), dtype=np.int16) # Duplicate mono channel for stereo if output_channels == 2: samples = np.vstack((samples, samples)).reshape((-1,), order='F') samples = samples.reshape(1, -1) layout = 'stereo' if output_channels == 2 else 'mono' new_frame = av.AudioFrame.from_ndarray(samples, format='s16', layout=layout) new_frame.sample_rate = old_frame.sample_rate new_frame.pts = old_frame.pts return new_frame except Exception as e: print (e) traceback.print_exc() raise(e) def audio_frame_callback(old_frame: av.AudioFrame) -> av.AudioFrame: new_frame = process_frame(old_frame) # print (f"frame: {old_frame}, pts: {old_frame.pts}") # print (f"new_frame: {new_frame}, pts: {new_frame.pts}") return new_frame # return old_frame webrtc_streamer( key="delay", mode=WebRtcMode.SENDRECV, rtc_configuration={"iceServers": get_ice_servers()}, video_frame_callback=video_frame_callback, audio_frame_callback=audio_frame_callback, )