Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import os | |
import re | |
import tempfile | |
import shutil | |
from tqdm import tqdm | |
import shlex | |
from ._streams import AudioStream, VideoStream, SubtitleStream | |
from ._errors import FFmpegNormalizeError | |
from ._cmd_utils import NUL, CommandRunner, DUR_REGEX, to_ms | |
from ._logger import setup_custom_logger | |
logger = setup_custom_logger("ffmpeg_normalize") | |
class MediaFile: | |
""" | |
Class that holds a file, its streams and adjustments | |
""" | |
def __init__(self, ffmpeg_normalize, input_file, output_file=None): | |
""" | |
Initialize a media file for later normalization. | |
Arguments: | |
ffmpeg_normalize {FFmpegNormalize} -- reference to overall settings | |
input_file {str} -- Path to input file | |
Keyword Arguments: | |
output_file {str} -- Path to output file (default: {None}) | |
""" | |
self.ffmpeg_normalize = ffmpeg_normalize | |
self.skip = False | |
self.input_file = input_file | |
self.output_file = output_file | |
self.streams = {"audio": {}, "video": {}, "subtitle": {}} | |
self.parse_streams() | |
def _stream_ids(self): | |
return ( | |
list(self.streams["audio"].keys()) | |
+ list(self.streams["video"].keys()) | |
+ list(self.streams["subtitle"].keys()) | |
) | |
def __repr__(self): | |
return os.path.basename(self.input_file) | |
def parse_streams(self): | |
""" | |
Try to parse all input streams from file | |
""" | |
logger.debug(f"Parsing streams of {self.input_file}") | |
cmd = [ | |
self.ffmpeg_normalize.ffmpeg_exe, | |
"-i", | |
self.input_file, | |
"-c", | |
"copy", | |
"-t", | |
"0", | |
"-map", | |
"0", | |
"-f", | |
"null", | |
NUL, | |
] | |
cmd_runner = CommandRunner(cmd) | |
cmd_runner.run_command() | |
output = cmd_runner.get_output() | |
logger.debug("Stream parsing command output:") | |
logger.debug(output) | |
output_lines = [line.strip() for line in output.split("\n")] | |
duration = None | |
for line in output_lines: | |
if "Duration" in line: | |
duration_search = DUR_REGEX.search(line) | |
if not duration_search: | |
logger.warning("Could not extract duration from input file!") | |
else: | |
duration = duration_search.groupdict() | |
duration = to_ms(**duration) / 1000 | |
logger.debug("Found duration: " + str(duration) + " s") | |
if not line.startswith("Stream"): | |
continue | |
stream_id_match = re.search(r"#0:([\d]+)", line) | |
if stream_id_match: | |
stream_id = int(stream_id_match.group(1)) | |
if stream_id in self._stream_ids(): | |
continue | |
else: | |
continue | |
if "Audio" in line: | |
logger.debug(f"Found audio stream at index {stream_id}") | |
sample_rate_match = re.search(r"(\d+) Hz", line) | |
sample_rate = ( | |
int(sample_rate_match.group(1)) if sample_rate_match else None | |
) | |
bit_depth_match = re.search(r"s(\d+)p?,", line) | |
bit_depth = int(bit_depth_match.group(1)) if bit_depth_match else None | |
self.streams["audio"][stream_id] = AudioStream( | |
self, | |
self.ffmpeg_normalize, | |
stream_id, | |
sample_rate, | |
bit_depth, | |
duration, | |
) | |
elif "Video" in line: | |
logger.debug(f"Found video stream at index {stream_id}") | |
self.streams["video"][stream_id] = VideoStream( | |
self, self.ffmpeg_normalize, stream_id | |
) | |
elif "Subtitle" in line: | |
logger.debug(f"Found subtitle stream at index {stream_id}") | |
self.streams["subtitle"][stream_id] = SubtitleStream( | |
self, self.ffmpeg_normalize, stream_id | |
) | |
if not self.streams["audio"]: | |
raise FFmpegNormalizeError( | |
f"Input file {self.input_file} does not contain any audio streams" | |
) | |
if ( | |
os.path.splitext(self.output_file)[1].lower() in [".wav", ".mp3", ".aac"] | |
and len(self.streams["audio"].values()) > 1 | |
): | |
logger.warning( | |
"Output file only supports one stream. " | |
"Keeping only first audio stream." | |
) | |
first_stream = list(self.streams["audio"].values())[0] | |
self.streams["audio"] = {first_stream.stream_id: first_stream} | |
self.streams["video"] = {} | |
self.streams["subtitle"] = {} | |
def run_normalization(self): | |
logger.debug(f"Running normalization for {self.input_file}") | |
# run the first pass to get loudness stats | |
self._first_pass() | |
# run the second pass as a whole | |
if self.ffmpeg_normalize.progress: | |
with tqdm(total=100, position=1, desc="Second Pass") as pbar: | |
for progress in self._second_pass(): | |
pbar.update(progress - pbar.n) | |
else: | |
for _ in self._second_pass(): | |
pass | |
def _first_pass(self): | |
logger.debug(f"Parsing normalization info for {self.input_file}") | |
for index, audio_stream in enumerate(self.streams["audio"].values()): | |
if self.ffmpeg_normalize.normalization_type == "ebu": | |
fun = getattr(audio_stream, "parse_loudnorm_stats") | |
else: | |
fun = getattr(audio_stream, "parse_volumedetect_stats") | |
if self.ffmpeg_normalize.progress: | |
with tqdm( | |
total=100, | |
position=1, | |
desc=f"Stream {index + 1}/{len(self.streams['audio'].values())}", | |
) as pbar: | |
for progress in fun(): | |
pbar.update(progress - pbar.n) | |
else: | |
for _ in fun(): | |
pass | |
if self.ffmpeg_normalize.print_stats: | |
stats = [ | |
audio_stream.get_stats() | |
for audio_stream in self.streams["audio"].values() | |
] | |
self.ffmpeg_normalize.stats.extend(stats) | |
def _get_audio_filter_cmd(self): | |
""" | |
Return filter_complex command and output labels needed | |
""" | |
filter_chains = [] | |
output_labels = [] | |
for audio_stream in self.streams["audio"].values(): | |
if self.ffmpeg_normalize.normalization_type == "ebu": | |
normalization_filter = audio_stream.get_second_pass_opts_ebu() | |
else: | |
normalization_filter = audio_stream.get_second_pass_opts_peakrms() | |
input_label = f"[0:{audio_stream.stream_id}]" | |
output_label = f"[norm{audio_stream.stream_id}]" | |
output_labels.append(output_label) | |
filter_chain = [] | |
if self.ffmpeg_normalize.pre_filter: | |
filter_chain.append(self.ffmpeg_normalize.pre_filter) | |
filter_chain.append(normalization_filter) | |
if self.ffmpeg_normalize.post_filter: | |
filter_chain.append(self.ffmpeg_normalize.post_filter) | |
filter_chains.append(input_label + ",".join(filter_chain) + output_label) | |
filter_complex_cmd = ";".join(filter_chains) | |
return filter_complex_cmd, output_labels | |
def _second_pass(self): | |
""" | |
Construct the second pass command and run it | |
FIXME: make this method simpler | |
""" | |
logger.info(f"Running second pass for {self.input_file}") | |
# get the target output stream types depending on the options | |
output_stream_types = ["audio"] | |
if not self.ffmpeg_normalize.video_disable: | |
output_stream_types.append("video") | |
if not self.ffmpeg_normalize.subtitle_disable: | |
output_stream_types.append("subtitle") | |
# base command, here we will add all other options | |
cmd = [self.ffmpeg_normalize.ffmpeg_exe, "-y", "-nostdin"] | |
# extra options (if any) | |
if self.ffmpeg_normalize.extra_input_options: | |
cmd.extend(self.ffmpeg_normalize.extra_input_options) | |
# get complex filter command | |
audio_filter_cmd, output_labels = self._get_audio_filter_cmd() | |
# add input file and basic filter | |
cmd.extend(["-i", self.input_file, "-filter_complex", audio_filter_cmd]) | |
# map metadata, only if needed | |
if self.ffmpeg_normalize.metadata_disable: | |
cmd.extend(["-map_metadata", "-1"]) | |
else: | |
# map global metadata | |
cmd.extend(["-map_metadata", "0"]) | |
# map per-stream metadata (e.g. language tags) | |
for stream_type in output_stream_types: | |
stream_key = stream_type[0] | |
if stream_type not in self.streams: | |
continue | |
for idx, _ in enumerate(self.streams[stream_type].items()): | |
cmd.extend( | |
[ | |
f"-map_metadata:s:{stream_key}:{idx}", | |
f"0:s:{stream_key}:{idx}", | |
] | |
) | |
# map chapters if needed | |
if self.ffmpeg_normalize.chapters_disable: | |
cmd.extend(["-map_chapters", "-1"]) | |
else: | |
cmd.extend(["-map_chapters", "0"]) | |
# collect all '-map' and codecs needed for output video based on input video | |
if not self.ffmpeg_normalize.video_disable: | |
for s in self.streams["video"].keys(): | |
cmd.extend(["-map", f"0:{s}"]) | |
# set codec (copy by default) | |
cmd.extend(["-c:v", self.ffmpeg_normalize.video_codec]) | |
# ... and map the output of the normalization filters | |
for ol in output_labels: | |
cmd.extend(["-map", ol]) | |
# set audio codec (never copy) | |
if self.ffmpeg_normalize.audio_codec: | |
cmd.extend(["-c:a", self.ffmpeg_normalize.audio_codec]) | |
else: | |
for index, (_, audio_stream) in enumerate(self.streams["audio"].items()): | |
cmd.extend([f"-c:a:{index}", audio_stream.get_pcm_codec()]) | |
# other audio options (if any) | |
if self.ffmpeg_normalize.audio_bitrate: | |
cmd.extend(["-b:a", str(self.ffmpeg_normalize.audio_bitrate)]) | |
if self.ffmpeg_normalize.sample_rate: | |
cmd.extend(["-ar", str(self.ffmpeg_normalize.sample_rate)]) | |
else: | |
if self.ffmpeg_normalize.normalization_type == "ebu": | |
logger.warn( | |
"The sample rate will automatically be set to 192 kHz by the loudnorm filter. " | |
"Specify -ar/--sample-rate to override it." | |
) | |
# ... and subtitles | |
if not self.ffmpeg_normalize.subtitle_disable: | |
for s in self.streams["subtitle"].keys(): | |
cmd.extend(["-map", f"0:{s}"]) | |
# copy subtitles | |
cmd.extend(["-c:s", "copy"]) | |
if self.ffmpeg_normalize.keep_original_audio: | |
highest_index = len(self.streams["audio"]) | |
for index, (_, s) in enumerate(self.streams["audio"].items()): | |
cmd.extend(["-map", f"0:a:{index}"]) | |
cmd.extend([f"-c:a:{highest_index + index}", "copy"]) | |
# extra options (if any) | |
if self.ffmpeg_normalize.extra_output_options: | |
cmd.extend(self.ffmpeg_normalize.extra_output_options) | |
# output format (if any) | |
if self.ffmpeg_normalize.output_format: | |
cmd.extend(["-f", self.ffmpeg_normalize.output_format]) | |
# if dry run, only show sample command | |
if self.ffmpeg_normalize.dry_run: | |
cmd.append(self.output_file) | |
cmd_runner = CommandRunner(cmd, dry=True) | |
cmd_runner.run_command() | |
yield 100 | |
return | |
# create a temporary output file name | |
temp_dir = tempfile.gettempdir() | |
output_file_suffix = os.path.splitext(self.output_file)[1] | |
temp_file_name = os.path.join( | |
temp_dir, next(tempfile._get_candidate_names()) + output_file_suffix | |
) | |
cmd.append(temp_file_name) | |
# run the actual command | |
try: | |
cmd_runner = CommandRunner(cmd) | |
try: | |
for progress in cmd_runner.run_ffmpeg_command(): | |
yield progress | |
except Exception as e: | |
logger.error( | |
"Error while running command {}! Error: {}".format( | |
" ".join([shlex.quote(c) for c in cmd]), e | |
) | |
) | |
raise e | |
else: | |
# move file from TMP to output file | |
logger.debug( | |
f"Moving temporary file from {temp_file_name} to {self.output_file}" | |
) | |
shutil.move(temp_file_name, self.output_file) | |
except Exception as e: | |
# remove dangling temporary file | |
if os.path.isfile(temp_file_name): | |
os.remove(temp_file_name) | |
raise e | |
logger.debug("Normalization finished") | |