File size: 2,056 Bytes
c4fe843
 
 
 
 
 
 
ae52b65
 
d91a673
c4fe843
 
 
 
 
 
 
 
 
d91a673
 
ae52b65
c4fe843
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d91a673
c4fe843
 
 
 
9b7ec10
c4fe843
 
 
 
9b7ec10
c4fe843
 
 
 
df0ea75
ae52b65
df0ea75
ae52b65
c4fe843
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67

import asyncio
import ray
from ray.util.queue import Queue

@ray.remote
class FFMpegConverterActor:
    def __init__(self, output_queue: Queue, buffer_size: int = 1920, output_format: str='s16le'):
        self.output_queue = output_queue
        self.buffer_size = buffer_size
        self.output_format = output_format

        self.input_pipe = None
        self.output_pipe = None

        self.process = None

    async def run(self):
        while True:
            chunk = await self.output_pipe.readexactly(self.buffer_size)
            # print(f"FFMpegConverterActor: read {len(chunk)} bytes")
            await self.output_queue.put_async(chunk)       

    async def start_process(self):
        cmd = [
            'ffmpeg',
            '-i', 'pipe:0',  # read from stdin
            '-f', self.output_format,
            '-ar', '48000',
            '-ac', '1',
            'pipe:1'  # write to stdout
        ]

        self.process = await asyncio.create_subprocess_exec(
            *cmd,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )

        self.input_pipe = self.process.stdin
        self.output_pipe = self.process.stdout

        assert self.input_pipe is not None, "input_pipe was not initialized"
        # print (f"input_pipe: {self.input_pipe}")

    async def push_chunk(self, chunk):
        try:
            self.input_pipe.write(chunk)
            await self.input_pipe.drain()
        except BrokenPipeError:
            # If the pipe is broken, restart the process.
            await self.start_process()
            self.input_pipe.write(chunk)
            await self.input_pipe.drain()

    # def has_processed_all_data(self):
    #     return self.process.poll() is not None

    async def flush_output_queue(self):
        while not self.output_queue.empty():
            await self.output_queue.get_async()   

    def close(self):
        self.input_pipe.close()
        self.output_pipe.close()
        self.process.wait()