import asyncio import io import logging import select import time import traceback from typing import List import av import numpy as np import streamlit as st from streamlit_webrtc import WebRtcMode, webrtc_streamer import pydub from dotenv import load_dotenv load_dotenv() from sample_utils.turn import get_ice_servers logger = logging.getLogger(__name__) import subprocess import os class FFMpegConverter: def __init__(self, input_format='mp3', output_format='s16le'): self.input_format = input_format self.output_format = output_format self.input_pipe = None self.output_pipe = None self.process = None self.start_process() def start_process(self): cmd = [ 'ffmpeg', # '-y', # '-f', self.input_format, '-i', 'pipe:0', # read from stdin '-f', self.output_format, '-ar', '48000', '-ac', '1', # '-acodec', 'pcm_s16le', # output format 'pipe:1' # write to stdout ] self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE) self.input_pipe = self.process.stdin self.output_pipe = self.process.stdout def push_chunk(self, chunk): try: self.input_pipe.write(chunk) self.input_pipe.flush() except BrokenPipeError: # If the pipe is broken, restart the process. self.start_process() self.input_pipe.write(chunk) self.input_pipe.flush() def read_output(self, num_bytes=1024): frame = self.output_pipe.read(num_bytes) return frame def has_processed_all_data(self): return self.process.poll() is not None def close(self): self.input_pipe.close() self.output_pipe.close() self.process.wait() def video_frame_callback( frame: av.VideoFrame, ) -> av.VideoFrame: return frame with open("chunks.pkl", "rb") as f: import pickle debug_chunks = pickle.load(f) converter = FFMpegConverter() for chunk in debug_chunks: converter.push_chunk(chunk) # emptry array of type int16 sample_buffer = np.zeros((0), dtype=np.int16) def process_frame(old_frame): try: output_channels = 2 output_sample_rate = 44100 required_samples = old_frame.samples frame_as_bytes = converter.read_output(required_samples*2*1) # 2 bytes per sample, mono samples = np.frombuffer(frame_as_bytes, dtype=np.int16) # Duplicate mono channel for stereo if output_channels == 2: samples = np.vstack((samples, samples)).reshape((-1,), order='F') samples = samples.reshape(1, -1) layout = 'stereo' if output_channels == 2 else 'mono' new_frame = av.AudioFrame.from_ndarray(samples, format='s16', layout=layout) new_frame.sample_rate = old_frame.sample_rate new_frame.pts = old_frame.pts return new_frame except Exception as e: print (e) traceback.print_exc() raise(e) def audio_frame_callback(old_frame: av.AudioFrame) -> av.AudioFrame: new_frame = process_frame(old_frame) # print (f"frame: {old_frame}, pts: {old_frame.pts}") # print (f"new_frame: {new_frame}, pts: {new_frame.pts}") return new_frame # return old_frame webrtc_streamer( key="delay", mode=WebRtcMode.SENDRECV, rtc_configuration={"iceServers": get_ice_servers()}, video_frame_callback=video_frame_callback, audio_frame_callback=audio_frame_callback, )