sohojoe commited on
Commit
6dfcd3a
1 Parent(s): d0639dc

basic version that is working

Browse files
Files changed (2) hide show
  1. debug_app.py +60 -87
  2. frames.pkl +0 -3
debug_app.py CHANGED
@@ -1,6 +1,8 @@
1
  import asyncio
2
  import io
3
  import logging
 
 
4
  import traceback
5
  from typing import List
6
 
@@ -17,40 +19,58 @@ from sample_utils.turn import get_ice_servers
17
  logger = logging.getLogger(__name__)
18
 
19
 
20
- class StreamingMP3ToFrames:
21
- def __init__(self):
22
- self.append = False
23
 
24
- def process_chunk(self, chunk):
25
- audio_frames = []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
  try:
27
- if self.append:
28
- self.bytes_io.write(chunk)
29
- self.append = False
30
- self.bytes_io.seek(0)
31
- else:
32
- self.bytes_io = io.BytesIO(chunk)
33
- container = av.open(self.bytes_io, 'r', format='mp3')
34
- audio_stream = next(s for s in container.streams if s.type == 'audio')
35
- for frame in container.decode(audio_stream):
36
- # Convert the audio frame to a NumPy array
37
- array = frame.to_ndarray()
38
-
39
- # Now you can use av.AudioFrame.from_ndarray
40
-
41
- # audio_frame = av.AudioFrame.from_ndarray(array, format='flt', layout='mono')
42
- audio_frame = av.AudioFrame.from_ndarray(array, format='fltp', layout='mono')
43
- audio_frame.sample_rate = 44100
44
-
45
- audio_frames.append(audio_frame)
46
-
47
- return audio_frames
48
-
49
- except Exception as e:
50
- print (e)
51
- self.append = True
52
- self.bytes_io.seek(0, io.SEEK_END)
53
- return audio_frames
54
 
55
 
56
  def video_frame_callback(
@@ -59,27 +79,14 @@ def video_frame_callback(
59
  return frame
60
 
61
 
62
-
63
- streaming_mp3_to_frames = StreamingMP3ToFrames()
64
-
65
  with open("chunks.pkl", "rb") as f:
66
  import pickle
67
  debug_chunks = pickle.load(f)
68
- debug_frames = []
69
- debug_frame_idx = 0
70
  for chunk in debug_chunks:
71
- new_frames = streaming_mp3_to_frames.process_chunk(chunk)
72
- for frame in new_frames:
73
- debug_frames.append(frame)
74
- # print (frame)
75
-
76
- def dequeue_frame():
77
- global debug_frame_idx, debug_frames
78
- enqueued_frame = debug_frames[debug_frame_idx]
79
- debug_frame_idx += 1
80
- if debug_frame_idx >= len(debug_frames):
81
- debug_frame_idx = 0
82
- return enqueued_frame
83
 
84
  # emptry array of type int16
85
  sample_buffer = np.zeros((0), dtype=np.int16)
@@ -91,38 +98,8 @@ def process_frame(old_frame):
91
  output_sample_rate = 44100
92
  required_samples = old_frame.samples
93
 
94
- global sample_buffer
95
- while sample_buffer.shape[0] < required_samples:
96
- dequeued_frame = dequeue_frame()
97
- if dequeued_frame is None:
98
- break
99
-
100
- # convert dequeued_frame to same format as old_frame
101
- float_samples = dequeued_frame.to_ndarray()
102
- max_sample = np.max(np.abs(float_samples))
103
- min_sample = np.min(np.abs(float_samples))
104
- if max_sample > 1.0 or min_sample > 1.0:
105
- print(f"WARNING: max_sample: {max_sample}, min_sample: {min_sample}")
106
- int_samples = np.int16(float_samples * 32767)
107
- sound = pydub.AudioSegment(
108
- data=int_samples.tobytes(),
109
- sample_width=2,
110
- frame_rate=output_sample_rate,
111
- channels=len(dequeued_frame.layout.channels),
112
- )
113
- sound = sound.set_frame_rate(old_frame.sample_rate)
114
-
115
- samples = np.array(sound.get_array_of_samples(), dtype=np.int16)
116
- sample_buffer = np.append(sample_buffer, samples)
117
-
118
- # handle case where we ran out of frames
119
- if sample_buffer.shape[0] < required_samples:
120
- empty_samples = np.zeros((required_samples - sample_buffer.shape[0]), dtype=np.int16)
121
- sample_buffer = np.append(sample_buffer, empty_samples)
122
-
123
- # take the first required_samples samples from the buffer
124
- samples = sample_buffer[:required_samples]
125
- sample_buffer = sample_buffer[required_samples:]
126
 
127
  # Duplicate mono channel for stereo
128
  if output_channels == 2:
@@ -143,14 +120,10 @@ def process_frame(old_frame):
143
 
144
 
145
  def audio_frame_callback(old_frame: av.AudioFrame) -> av.AudioFrame:
146
-
147
- global debug_frame_idx, debug_frames
148
-
149
  new_frame = process_frame(old_frame)
150
 
151
- # print (f"new_frames: {len(new_frames)}, frames: {len(frames)}")
152
- print (f"frame: {old_frame}, pts: {old_frame.pts}")
153
- print (f"new_frame: {new_frame}, pts: {new_frame.pts}")
154
 
155
  return new_frame
156
  # return old_frame
 
1
  import asyncio
2
  import io
3
  import logging
4
+ import select
5
+ import time
6
  import traceback
7
  from typing import List
8
 
 
19
  logger = logging.getLogger(__name__)
20
 
21
 
22
+ import subprocess
23
+ import os
 
24
 
25
+ class FFMpegConverter:
26
+ def __init__(self, input_format='mp3', output_format='s16le'):
27
+ self.input_format = input_format
28
+ self.output_format = output_format
29
+
30
+ self.input_pipe = None
31
+ self.output_pipe = None
32
+
33
+ self.process = None
34
+ self.start_process()
35
+
36
+ def start_process(self):
37
+ cmd = [
38
+ 'ffmpeg',
39
+ # '-y',
40
+ # '-f', self.input_format,
41
+ '-i', 'pipe:0', # read from stdin
42
+ '-f', self.output_format,
43
+ '-ar', '48000',
44
+ '-ac', '1',
45
+ # '-acodec', 'pcm_s16le', # output format
46
+ 'pipe:1' # write to stdout
47
+ ]
48
+
49
+ self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
50
+ self.input_pipe = self.process.stdin
51
+ self.output_pipe = self.process.stdout
52
+
53
+ def push_chunk(self, chunk):
54
  try:
55
+ self.input_pipe.write(chunk)
56
+ self.input_pipe.flush()
57
+ except BrokenPipeError:
58
+ # If the pipe is broken, restart the process.
59
+ self.start_process()
60
+ self.input_pipe.write(chunk)
61
+ self.input_pipe.flush()
62
+
63
+ def read_output(self, num_bytes=1024):
64
+ frame = self.output_pipe.read(num_bytes)
65
+ return frame
66
+
67
+ def has_processed_all_data(self):
68
+ return self.process.poll() is not None
69
+
70
+ def close(self):
71
+ self.input_pipe.close()
72
+ self.output_pipe.close()
73
+ self.process.wait()
 
 
 
 
 
 
 
 
74
 
75
 
76
  def video_frame_callback(
 
79
  return frame
80
 
81
 
 
 
 
82
  with open("chunks.pkl", "rb") as f:
83
  import pickle
84
  debug_chunks = pickle.load(f)
85
+
86
+ converter = FFMpegConverter()
87
  for chunk in debug_chunks:
88
+ converter.push_chunk(chunk)
89
+
 
 
 
 
 
 
 
 
 
 
90
 
91
  # emptry array of type int16
92
  sample_buffer = np.zeros((0), dtype=np.int16)
 
98
  output_sample_rate = 44100
99
  required_samples = old_frame.samples
100
 
101
+ frame_as_bytes = converter.read_output(required_samples*2*1) # 2 bytes per sample, mono
102
+ samples = np.frombuffer(frame_as_bytes, dtype=np.int16)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
103
 
104
  # Duplicate mono channel for stereo
105
  if output_channels == 2:
 
120
 
121
 
122
  def audio_frame_callback(old_frame: av.AudioFrame) -> av.AudioFrame:
 
 
 
123
  new_frame = process_frame(old_frame)
124
 
125
+ # print (f"frame: {old_frame}, pts: {old_frame.pts}")
126
+ # print (f"new_frame: {new_frame}, pts: {new_frame.pts}")
 
127
 
128
  return new_frame
129
  # return old_frame
frames.pkl DELETED
@@ -1,3 +0,0 @@
1
- version https://git-lfs.github.com/spec/v1
2
- oid sha256:d85ef7ee28d01dab9ee6faa439f791a1647cc937e0c68e3e5a73d5bd2f071d7f
3
- size 117337