project_charles / ffmpeg_converter_actor.py
sohojoe's picture
fix async issues and flush ffmpeg actor
df0ea75
raw
history blame
2.06 kB
import asyncio
import ray
from ray.util.queue import Queue
@ray.remote
class FFMpegConverterActor:
def __init__(self, output_queue: Queue, buffer_size: int = 1920, output_format: str='s16le'):
self.output_queue = output_queue
self.buffer_size = buffer_size
self.output_format = output_format
self.input_pipe = None
self.output_pipe = None
self.process = None
async def run(self):
while True:
chunk = await self.output_pipe.readexactly(self.buffer_size)
# print(f"FFMpegConverterActor: read {len(chunk)} bytes")
await self.output_queue.put_async(chunk)
async def start_process(self):
cmd = [
'ffmpeg',
'-i', 'pipe:0', # read from stdin
'-f', self.output_format,
'-ar', '48000',
'-ac', '1',
'pipe:1' # write to stdout
]
self.process = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
self.input_pipe = self.process.stdin
self.output_pipe = self.process.stdout
assert self.input_pipe is not None, "input_pipe was not initialized"
# print (f"input_pipe: {self.input_pipe}")
async def push_chunk(self, chunk):
try:
self.input_pipe.write(chunk)
# await self.input_pipe.drain()
except BrokenPipeError:
# If the pipe is broken, restart the process.
await self.start_process()
self.input_pipe.write(chunk)
# await self.input_pipe.drain()
# def has_processed_all_data(self):
# return self.process.poll() is not None
async def flush_output_queue(self):
while not self.output_queue.empty():
await self.output_queue.get_async()
def close(self):
self.input_pipe.close()
self.output_pipe.close()
self.process.wait()