project_charles / debug_app.py
sohojoe's picture
updated version that uses ray queues so it doesn't block.
681abd4
raw
history blame
4.48 kB
import asyncio
import io
import logging
import select
import time
import traceback
from typing import List
import av
import numpy as np
import streamlit as st
from streamlit_webrtc import WebRtcMode, webrtc_streamer
import pydub
from dotenv import load_dotenv
load_dotenv()
from sample_utils.turn import get_ice_servers
logger = logging.getLogger(__name__)
import subprocess
import os
import ray
from ray.util.queue import Queue
if not ray.is_initialized():
# Try to connect to a running Ray cluster
ray_address = os.getenv('RAY_ADDRESS')
if ray_address:
ray.init(ray_address, namespace="project_charles")
else:
ray.init(namespace="project_charles")
@ray.remote
class FFMpegConverterActor:
def __init__(self, queue: Queue, buffer_size: int = 1920, output_format: str='s16le'):
self.queue = queue
self.buffer_size = 1920
self.output_format = output_format
self.input_pipe = None
self.output_pipe = None
self.process = None
async def run(self):
while True:
chunk = await self.output_pipe.read(self.buffer_size)
await self.queue.put_async(chunk)
async def start_process(self):
cmd = [
'ffmpeg',
'-i', 'pipe:0', # read from stdin
'-f', self.output_format,
'-ar', '48000',
'-ac', '1',
'pipe:1' # write to stdout
]
self.process = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
self.input_pipe = self.process.stdin
self.output_pipe = self.process.stdout
assert self.input_pipe is not None, "input_pipe was not initialized"
print (f"input_pipe: {self.input_pipe}")
async def push_chunk(self, chunk):
try:
self.input_pipe.write(chunk)
# await self.input_pipe.drain()
except BrokenPipeError:
# If the pipe is broken, restart the process.
await self.start_process()
self.input_pipe.write(chunk)
# await self.input_pipe.drain()
# def has_processed_all_data(self):
# return self.process.poll() is not None
def close(self):
self.input_pipe.close()
self.output_pipe.close()
self.process.wait()
def video_frame_callback(
frame: av.VideoFrame,
) -> av.VideoFrame:
return frame
with open("chunks.pkl", "rb") as f:
import pickle
debug_chunks = pickle.load(f)
converter_queue = Queue(maxsize=100)
converter_actor = FFMpegConverterActor.remote(converter_queue)
ray.get(converter_actor.start_process.remote())
converter_actor.run.remote()
for chunk in debug_chunks:
ray.get(converter_actor.push_chunk.remote(chunk))
# emptry array of type int16
sample_buffer = np.zeros((0), dtype=np.int16)
def process_frame(old_frame):
try:
output_channels = 2
output_sample_rate = 44100
required_samples = old_frame.samples
if not converter_queue.empty():
frame_as_bytes = converter_queue.get()
samples = np.frombuffer(frame_as_bytes, dtype=np.int16)
else:
# create a byte array of zeros
samples = np.zeros((required_samples * 2 * 1), dtype=np.int16)
# Duplicate mono channel for stereo
if output_channels == 2:
samples = np.vstack((samples, samples)).reshape((-1,), order='F')
samples = samples.reshape(1, -1)
layout = 'stereo' if output_channels == 2 else 'mono'
new_frame = av.AudioFrame.from_ndarray(samples, format='s16', layout=layout)
new_frame.sample_rate = old_frame.sample_rate
new_frame.pts = old_frame.pts
return new_frame
except Exception as e:
print (e)
traceback.print_exc()
raise(e)
def audio_frame_callback(old_frame: av.AudioFrame) -> av.AudioFrame:
new_frame = process_frame(old_frame)
# print (f"frame: {old_frame}, pts: {old_frame.pts}")
# print (f"new_frame: {new_frame}, pts: {new_frame.pts}")
return new_frame
# return old_frame
webrtc_streamer(
key="delay",
mode=WebRtcMode.SENDRECV,
rtc_configuration={"iceServers": get_ice_servers()},
video_frame_callback=video_frame_callback,
audio_frame_callback=audio_frame_callback,
)