sohojoe commited on
Commit
c4fe843
1 Parent(s): 681abd4

refactor ffmeg_concerter_actor.py to its own class

Browse files
Files changed (2) hide show
  1. debug_app.py +2 -62
  2. ffmpeg_converter_actor.py +62 -0
debug_app.py CHANGED
@@ -1,8 +1,4 @@
1
- import asyncio
2
- import io
3
  import logging
4
- import select
5
- import time
6
  import traceback
7
  from typing import List
8
 
@@ -13,13 +9,13 @@ from streamlit_webrtc import WebRtcMode, webrtc_streamer
13
  import pydub
14
 
15
  from dotenv import load_dotenv
 
16
  load_dotenv()
17
  from sample_utils.turn import get_ice_servers
18
 
19
  logger = logging.getLogger(__name__)
20
 
21
 
22
- import subprocess
23
  import os
24
 
25
  import ray
@@ -32,63 +28,7 @@ if not ray.is_initialized():
32
  else:
33
  ray.init(namespace="project_charles")
34
 
35
- @ray.remote
36
- class FFMpegConverterActor:
37
- def __init__(self, queue: Queue, buffer_size: int = 1920, output_format: str='s16le'):
38
- self.queue = queue
39
- self.buffer_size = 1920
40
- self.output_format = output_format
41
-
42
- self.input_pipe = None
43
- self.output_pipe = None
44
-
45
- self.process = None
46
-
47
- async def run(self):
48
- while True:
49
- chunk = await self.output_pipe.read(self.buffer_size)
50
- await self.queue.put_async(chunk)
51
-
52
- async def start_process(self):
53
- cmd = [
54
- 'ffmpeg',
55
- '-i', 'pipe:0', # read from stdin
56
- '-f', self.output_format,
57
- '-ar', '48000',
58
- '-ac', '1',
59
- 'pipe:1' # write to stdout
60
- ]
61
-
62
- self.process = await asyncio.create_subprocess_exec(
63
- *cmd,
64
- stdin=asyncio.subprocess.PIPE,
65
- stdout=asyncio.subprocess.PIPE,
66
- stderr=asyncio.subprocess.PIPE
67
- )
68
-
69
- self.input_pipe = self.process.stdin
70
- self.output_pipe = self.process.stdout
71
-
72
- assert self.input_pipe is not None, "input_pipe was not initialized"
73
- print (f"input_pipe: {self.input_pipe}")
74
-
75
- async def push_chunk(self, chunk):
76
- try:
77
- self.input_pipe.write(chunk)
78
- # await self.input_pipe.drain()
79
- except BrokenPipeError:
80
- # If the pipe is broken, restart the process.
81
- await self.start_process()
82
- self.input_pipe.write(chunk)
83
- # await self.input_pipe.drain()
84
-
85
- # def has_processed_all_data(self):
86
- # return self.process.poll() is not None
87
-
88
- def close(self):
89
- self.input_pipe.close()
90
- self.output_pipe.close()
91
- self.process.wait()
92
 
93
 
94
  def video_frame_callback(
 
 
 
1
  import logging
 
 
2
  import traceback
3
  from typing import List
4
 
 
9
  import pydub
10
 
11
  from dotenv import load_dotenv
12
+ from ffmpeg_converter_actor import FFMpegConverterActor
13
  load_dotenv()
14
  from sample_utils.turn import get_ice_servers
15
 
16
  logger = logging.getLogger(__name__)
17
 
18
 
 
19
  import os
20
 
21
  import ray
 
28
  else:
29
  ray.init(namespace="project_charles")
30
 
31
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
 
33
 
34
  def video_frame_callback(
ffmpeg_converter_actor.py ADDED
@@ -0,0 +1,62 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ import asyncio
3
+ import ray
4
+ from ray.util.queue import Queue
5
+
6
+ @ray.remote
7
+ class FFMpegConverterActor:
8
+ def __init__(self, queue: Queue, buffer_size: int = 1920, output_format: str='s16le'):
9
+ self.queue = queue
10
+ self.buffer_size = 1920
11
+ self.output_format = output_format
12
+
13
+ self.input_pipe = None
14
+ self.output_pipe = None
15
+
16
+ self.process = None
17
+
18
+ async def run(self):
19
+ while True:
20
+ chunk = await self.output_pipe.read(self.buffer_size)
21
+ await self.queue.put_async(chunk)
22
+
23
+ async def start_process(self):
24
+ cmd = [
25
+ 'ffmpeg',
26
+ '-i', 'pipe:0', # read from stdin
27
+ '-f', self.output_format,
28
+ '-ar', '48000',
29
+ '-ac', '1',
30
+ 'pipe:1' # write to stdout
31
+ ]
32
+
33
+ self.process = await asyncio.create_subprocess_exec(
34
+ *cmd,
35
+ stdin=asyncio.subprocess.PIPE,
36
+ stdout=asyncio.subprocess.PIPE,
37
+ stderr=asyncio.subprocess.PIPE
38
+ )
39
+
40
+ self.input_pipe = self.process.stdin
41
+ self.output_pipe = self.process.stdout
42
+
43
+ assert self.input_pipe is not None, "input_pipe was not initialized"
44
+ print (f"input_pipe: {self.input_pipe}")
45
+
46
+ async def push_chunk(self, chunk):
47
+ try:
48
+ self.input_pipe.write(chunk)
49
+ # await self.input_pipe.drain()
50
+ except BrokenPipeError:
51
+ # If the pipe is broken, restart the process.
52
+ await self.start_process()
53
+ self.input_pipe.write(chunk)
54
+ # await self.input_pipe.drain()
55
+
56
+ # def has_processed_all_data(self):
57
+ # return self.process.poll() is not None
58
+
59
+ def close(self):
60
+ self.input_pipe.close()
61
+ self.output_pipe.close()
62
+ self.process.wait()