project_charles / ffmpeg_converter_actor.py
sohojoe's picture
refactor ffmeg_concerter_actor.py to its own class
c4fe843
raw
history blame
1.81 kB
import asyncio
import ray
from ray.util.queue import Queue
@ray.remote
class FFMpegConverterActor:
def __init__(self, queue: Queue, buffer_size: int = 1920, output_format: str='s16le'):
self.queue = queue
self.buffer_size = 1920
self.output_format = output_format
self.input_pipe = None
self.output_pipe = None
self.process = None
async def run(self):
while True:
chunk = await self.output_pipe.read(self.buffer_size)
await self.queue.put_async(chunk)
async def start_process(self):
cmd = [
'ffmpeg',
'-i', 'pipe:0', # read from stdin
'-f', self.output_format,
'-ar', '48000',
'-ac', '1',
'pipe:1' # write to stdout
]
self.process = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
self.input_pipe = self.process.stdin
self.output_pipe = self.process.stdout
assert self.input_pipe is not None, "input_pipe was not initialized"
print (f"input_pipe: {self.input_pipe}")
async def push_chunk(self, chunk):
try:
self.input_pipe.write(chunk)
# await self.input_pipe.drain()
except BrokenPipeError:
# If the pipe is broken, restart the process.
await self.start_process()
self.input_pipe.write(chunk)
# await self.input_pipe.drain()
# def has_processed_all_data(self):
# return self.process.poll() is not None
def close(self):
self.input_pipe.close()
self.output_pipe.close()
self.process.wait()