Spaces:
Running
Running
from pydub import AudioSegment | |
from tqdm import tqdm | |
from .utils import run_command | |
from .logging_setup import logger | |
import numpy as np | |
class Mixer: | |
def __init__(self): | |
self.parts = [] | |
def __len__(self): | |
parts = self._sync() | |
seg = parts[0][1] | |
frame_count = max(offset + seg.frame_count() for offset, seg in parts) | |
return int(1000.0 * frame_count / seg.frame_rate) | |
def overlay(self, sound, position=0): | |
self.parts.append((position, sound)) | |
return self | |
def _sync(self): | |
positions, segs = zip(*self.parts) | |
frame_rate = segs[0].frame_rate | |
array_type = segs[0].array_type # noqa | |
offsets = [int(frame_rate * pos / 1000.0) for pos in positions] | |
segs = AudioSegment.empty()._sync(*segs) | |
return list(zip(offsets, segs)) | |
def append(self, sound): | |
self.overlay(sound, position=len(self)) | |
def to_audio_segment(self): | |
parts = self._sync() | |
seg = parts[0][1] | |
channels = seg.channels | |
frame_count = max(offset + seg.frame_count() for offset, seg in parts) | |
sample_count = int(frame_count * seg.channels) | |
output = np.zeros(sample_count, dtype="int32") | |
for offset, seg in parts: | |
sample_offset = offset * channels | |
samples = np.frombuffer(seg.get_array_of_samples(), dtype="int32") | |
samples = np.int16(samples/np.max(np.abs(samples)) * 32767) | |
start = sample_offset | |
end = start + len(samples) | |
output[start:end] += samples | |
return seg._spawn( | |
output, overrides={"sample_width": 4}).normalize(headroom=0.0) | |
def create_translated_audio( | |
result_diarize, audio_files, final_file, concat=False, avoid_overlap=False, | |
): | |
total_duration = result_diarize["segments"][-1]["end"] # in seconds | |
if concat: | |
""" | |
file .\audio\1.ogg | |
file .\audio\2.ogg | |
file .\audio\3.ogg | |
file .\audio\4.ogg | |
... | |
""" | |
# Write the file paths to list.txt | |
with open("list.txt", "w") as file: | |
for i, audio_file in enumerate(audio_files): | |
if i == len(audio_files) - 1: # Check if it's the last item | |
file.write(f"file {audio_file}") | |
else: | |
file.write(f"file {audio_file}\n") | |
# command = f"ffmpeg -f concat -safe 0 -i list.txt {final_file}" | |
command = ( | |
f"ffmpeg -f concat -safe 0 -i list.txt -c:a pcm_s16le {final_file}" | |
) | |
run_command(command) | |
else: | |
# silent audio with total_duration | |
base_audio = AudioSegment.silent( | |
duration=int(total_duration * 1000), frame_rate=41000 | |
) | |
combined_audio = Mixer() | |
combined_audio.overlay(base_audio) | |
logger.debug( | |
f"Audio duration: {total_duration // 60} " | |
f"minutes and {int(total_duration % 60)} seconds" | |
) | |
last_end_time = 0 | |
previous_speaker = "" | |
for line, audio_file in tqdm( | |
zip(result_diarize["segments"], audio_files) | |
): | |
start = float(line["start"]) | |
# Overlay each audio at the corresponding time | |
try: | |
audio = AudioSegment.from_file(audio_file) | |
# audio_a = audio.speedup(playback_speed=1.5) | |
if avoid_overlap: | |
speaker = line["speaker"] | |
if (last_end_time - 0.500) > start: | |
overlap_time = last_end_time - start | |
if previous_speaker and previous_speaker != speaker: | |
start = (last_end_time - 0.500) | |
else: | |
start = (last_end_time - 0.200) | |
if overlap_time > 2.5: | |
start = start - 0.3 | |
logger.info( | |
f"Avoid overlap for {str(audio_file)} " | |
f"with {str(start)}" | |
) | |
previous_speaker = speaker | |
duration_tts_seconds = len(audio) / 1000.0 # to sec | |
last_end_time = (start + duration_tts_seconds) | |
start_time = start * 1000 # to ms | |
combined_audio = combined_audio.overlay( | |
audio, position=start_time | |
) | |
except Exception as error: | |
logger.debug(str(error)) | |
logger.error(f"Error audio file {audio_file}") | |
# combined audio as a file | |
combined_audio_data = combined_audio.to_audio_segment() | |
combined_audio_data.export( | |
final_file, format="wav" | |
) # best than ogg, change if the audio is anomalous | |