sohojoe commited on
Commit
681abd4
1 Parent(s): 6dfcd3a

updated version that uses ray queues so it doesn't block.

Browse files
Files changed (1) hide show
  1. debug_app.py +48 -23
debug_app.py CHANGED
@@ -22,50 +22,68 @@ logger = logging.getLogger(__name__)
22
  import subprocess
23
  import os
24
 
25
- class FFMpegConverter:
26
- def __init__(self, input_format='mp3', output_format='s16le'):
27
- self.input_format = input_format
 
 
 
 
 
 
 
 
 
 
 
 
28
  self.output_format = output_format
29
 
30
  self.input_pipe = None
31
  self.output_pipe = None
32
 
33
  self.process = None
34
- self.start_process()
35
 
36
- def start_process(self):
 
 
 
 
 
37
  cmd = [
38
  'ffmpeg',
39
- # '-y',
40
- # '-f', self.input_format,
41
  '-i', 'pipe:0', # read from stdin
42
  '-f', self.output_format,
43
  '-ar', '48000',
44
  '-ac', '1',
45
- # '-acodec', 'pcm_s16le', # output format
46
  'pipe:1' # write to stdout
47
  ]
48
 
49
- self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
 
 
 
 
 
 
50
  self.input_pipe = self.process.stdin
51
  self.output_pipe = self.process.stdout
52
 
53
- def push_chunk(self, chunk):
 
 
 
54
  try:
55
  self.input_pipe.write(chunk)
56
- self.input_pipe.flush()
57
  except BrokenPipeError:
58
  # If the pipe is broken, restart the process.
59
- self.start_process()
60
  self.input_pipe.write(chunk)
61
- self.input_pipe.flush()
62
-
63
- def read_output(self, num_bytes=1024):
64
- frame = self.output_pipe.read(num_bytes)
65
- return frame
66
 
67
- def has_processed_all_data(self):
68
- return self.process.poll() is not None
69
 
70
  def close(self):
71
  self.input_pipe.close()
@@ -83,9 +101,12 @@ with open("chunks.pkl", "rb") as f:
83
  import pickle
84
  debug_chunks = pickle.load(f)
85
 
86
- converter = FFMpegConverter()
 
 
 
87
  for chunk in debug_chunks:
88
- converter.push_chunk(chunk)
89
 
90
 
91
  # emptry array of type int16
@@ -98,8 +119,12 @@ def process_frame(old_frame):
98
  output_sample_rate = 44100
99
  required_samples = old_frame.samples
100
 
101
- frame_as_bytes = converter.read_output(required_samples*2*1) # 2 bytes per sample, mono
102
- samples = np.frombuffer(frame_as_bytes, dtype=np.int16)
 
 
 
 
103
 
104
  # Duplicate mono channel for stereo
105
  if output_channels == 2:
 
22
  import subprocess
23
  import os
24
 
25
+ import ray
26
+ from ray.util.queue import Queue
27
+ if not ray.is_initialized():
28
+ # Try to connect to a running Ray cluster
29
+ ray_address = os.getenv('RAY_ADDRESS')
30
+ if ray_address:
31
+ ray.init(ray_address, namespace="project_charles")
32
+ else:
33
+ ray.init(namespace="project_charles")
34
+
35
+ @ray.remote
36
+ class FFMpegConverterActor:
37
+ def __init__(self, queue: Queue, buffer_size: int = 1920, output_format: str='s16le'):
38
+ self.queue = queue
39
+ self.buffer_size = 1920
40
  self.output_format = output_format
41
 
42
  self.input_pipe = None
43
  self.output_pipe = None
44
 
45
  self.process = None
 
46
 
47
+ async def run(self):
48
+ while True:
49
+ chunk = await self.output_pipe.read(self.buffer_size)
50
+ await self.queue.put_async(chunk)
51
+
52
+ async def start_process(self):
53
  cmd = [
54
  'ffmpeg',
 
 
55
  '-i', 'pipe:0', # read from stdin
56
  '-f', self.output_format,
57
  '-ar', '48000',
58
  '-ac', '1',
 
59
  'pipe:1' # write to stdout
60
  ]
61
 
62
+ self.process = await asyncio.create_subprocess_exec(
63
+ *cmd,
64
+ stdin=asyncio.subprocess.PIPE,
65
+ stdout=asyncio.subprocess.PIPE,
66
+ stderr=asyncio.subprocess.PIPE
67
+ )
68
+
69
  self.input_pipe = self.process.stdin
70
  self.output_pipe = self.process.stdout
71
 
72
+ assert self.input_pipe is not None, "input_pipe was not initialized"
73
+ print (f"input_pipe: {self.input_pipe}")
74
+
75
+ async def push_chunk(self, chunk):
76
  try:
77
  self.input_pipe.write(chunk)
78
+ # await self.input_pipe.drain()
79
  except BrokenPipeError:
80
  # If the pipe is broken, restart the process.
81
+ await self.start_process()
82
  self.input_pipe.write(chunk)
83
+ # await self.input_pipe.drain()
 
 
 
 
84
 
85
+ # def has_processed_all_data(self):
86
+ # return self.process.poll() is not None
87
 
88
  def close(self):
89
  self.input_pipe.close()
 
101
  import pickle
102
  debug_chunks = pickle.load(f)
103
 
104
+ converter_queue = Queue(maxsize=100)
105
+ converter_actor = FFMpegConverterActor.remote(converter_queue)
106
+ ray.get(converter_actor.start_process.remote())
107
+ converter_actor.run.remote()
108
  for chunk in debug_chunks:
109
+ ray.get(converter_actor.push_chunk.remote(chunk))
110
 
111
 
112
  # emptry array of type int16
 
119
  output_sample_rate = 44100
120
  required_samples = old_frame.samples
121
 
122
+ if not converter_queue.empty():
123
+ frame_as_bytes = converter_queue.get()
124
+ samples = np.frombuffer(frame_as_bytes, dtype=np.int16)
125
+ else:
126
+ # create a byte array of zeros
127
+ samples = np.zeros((required_samples * 2 * 1), dtype=np.int16)
128
 
129
  # Duplicate mono channel for stereo
130
  if output_channels == 2: