import os import re import tempfile import shutil from tqdm import tqdm import shlex from ._streams import AudioStream, VideoStream, SubtitleStream from ._errors import FFmpegNormalizeError from ._cmd_utils import NUL, CommandRunner, DUR_REGEX, to_ms from ._logger import setup_custom_logger logger = setup_custom_logger("ffmpeg_normalize") class MediaFile: """ Class that holds a file, its streams and adjustments """ def __init__(self, ffmpeg_normalize, input_file, output_file=None): """ Initialize a media file for later normalization. Arguments: ffmpeg_normalize {FFmpegNormalize} -- reference to overall settings input_file {str} -- Path to input file Keyword Arguments: output_file {str} -- Path to output file (default: {None}) """ self.ffmpeg_normalize = ffmpeg_normalize self.skip = False self.input_file = input_file self.output_file = output_file self.streams = {"audio": {}, "video": {}, "subtitle": {}} self.parse_streams() def _stream_ids(self): return ( list(self.streams["audio"].keys()) + list(self.streams["video"].keys()) + list(self.streams["subtitle"].keys()) ) def __repr__(self): return os.path.basename(self.input_file) def parse_streams(self): """ Try to parse all input streams from file """ logger.debug(f"Parsing streams of {self.input_file}") cmd = [ self.ffmpeg_normalize.ffmpeg_exe, "-i", self.input_file, "-c", "copy", "-t", "0", "-map", "0", "-f", "null", NUL, ] cmd_runner = CommandRunner(cmd) cmd_runner.run_command() output = cmd_runner.get_output() logger.debug("Stream parsing command output:") logger.debug(output) output_lines = [line.strip() for line in output.split("\n")] duration = None for line in output_lines: if "Duration" in line: duration_search = DUR_REGEX.search(line) if not duration_search: logger.warning("Could not extract duration from input file!") else: duration = duration_search.groupdict() duration = to_ms(**duration) / 1000 logger.debug("Found duration: " + str(duration) + " s") if not line.startswith("Stream"): continue stream_id_match = re.search(r"#0:([\d]+)", line) if stream_id_match: stream_id = int(stream_id_match.group(1)) if stream_id in self._stream_ids(): continue else: continue if "Audio" in line: logger.debug(f"Found audio stream at index {stream_id}") sample_rate_match = re.search(r"(\d+) Hz", line) sample_rate = ( int(sample_rate_match.group(1)) if sample_rate_match else None ) bit_depth_match = re.search(r"s(\d+)p?,", line) bit_depth = int(bit_depth_match.group(1)) if bit_depth_match else None self.streams["audio"][stream_id] = AudioStream( self, self.ffmpeg_normalize, stream_id, sample_rate, bit_depth, duration, ) elif "Video" in line: logger.debug(f"Found video stream at index {stream_id}") self.streams["video"][stream_id] = VideoStream( self, self.ffmpeg_normalize, stream_id ) elif "Subtitle" in line: logger.debug(f"Found subtitle stream at index {stream_id}") self.streams["subtitle"][stream_id] = SubtitleStream( self, self.ffmpeg_normalize, stream_id ) if not self.streams["audio"]: raise FFmpegNormalizeError( f"Input file {self.input_file} does not contain any audio streams" ) if ( os.path.splitext(self.output_file)[1].lower() in [".wav", ".mp3", ".aac"] and len(self.streams["audio"].values()) > 1 ): logger.warning( "Output file only supports one stream. " "Keeping only first audio stream." ) first_stream = list(self.streams["audio"].values())[0] self.streams["audio"] = {first_stream.stream_id: first_stream} self.streams["video"] = {} self.streams["subtitle"] = {} def run_normalization(self): logger.debug(f"Running normalization for {self.input_file}") # run the first pass to get loudness stats self._first_pass() # run the second pass as a whole if self.ffmpeg_normalize.progress: with tqdm(total=100, position=1, desc="Second Pass") as pbar: for progress in self._second_pass(): pbar.update(progress - pbar.n) else: for _ in self._second_pass(): pass def _first_pass(self): logger.debug(f"Parsing normalization info for {self.input_file}") for index, audio_stream in enumerate(self.streams["audio"].values()): if self.ffmpeg_normalize.normalization_type == "ebu": fun = getattr(audio_stream, "parse_loudnorm_stats") else: fun = getattr(audio_stream, "parse_volumedetect_stats") if self.ffmpeg_normalize.progress: with tqdm( total=100, position=1, desc=f"Stream {index + 1}/{len(self.streams['audio'].values())}", ) as pbar: for progress in fun(): pbar.update(progress - pbar.n) else: for _ in fun(): pass if self.ffmpeg_normalize.print_stats: stats = [ audio_stream.get_stats() for audio_stream in self.streams["audio"].values() ] self.ffmpeg_normalize.stats.extend(stats) def _get_audio_filter_cmd(self): """ Return filter_complex command and output labels needed """ filter_chains = [] output_labels = [] for audio_stream in self.streams["audio"].values(): if self.ffmpeg_normalize.normalization_type == "ebu": normalization_filter = audio_stream.get_second_pass_opts_ebu() else: normalization_filter = audio_stream.get_second_pass_opts_peakrms() input_label = f"[0:{audio_stream.stream_id}]" output_label = f"[norm{audio_stream.stream_id}]" output_labels.append(output_label) filter_chain = [] if self.ffmpeg_normalize.pre_filter: filter_chain.append(self.ffmpeg_normalize.pre_filter) filter_chain.append(normalization_filter) if self.ffmpeg_normalize.post_filter: filter_chain.append(self.ffmpeg_normalize.post_filter) filter_chains.append(input_label + ",".join(filter_chain) + output_label) filter_complex_cmd = ";".join(filter_chains) return filter_complex_cmd, output_labels def _second_pass(self): """ Construct the second pass command and run it FIXME: make this method simpler """ logger.info(f"Running second pass for {self.input_file}") # get the target output stream types depending on the options output_stream_types = ["audio"] if not self.ffmpeg_normalize.video_disable: output_stream_types.append("video") if not self.ffmpeg_normalize.subtitle_disable: output_stream_types.append("subtitle") # base command, here we will add all other options cmd = [self.ffmpeg_normalize.ffmpeg_exe, "-y", "-nostdin"] # extra options (if any) if self.ffmpeg_normalize.extra_input_options: cmd.extend(self.ffmpeg_normalize.extra_input_options) # get complex filter command audio_filter_cmd, output_labels = self._get_audio_filter_cmd() # add input file and basic filter cmd.extend(["-i", self.input_file, "-filter_complex", audio_filter_cmd]) # map metadata, only if needed if self.ffmpeg_normalize.metadata_disable: cmd.extend(["-map_metadata", "-1"]) else: # map global metadata cmd.extend(["-map_metadata", "0"]) # map per-stream metadata (e.g. language tags) for stream_type in output_stream_types: stream_key = stream_type[0] if stream_type not in self.streams: continue for idx, _ in enumerate(self.streams[stream_type].items()): cmd.extend( [ f"-map_metadata:s:{stream_key}:{idx}", f"0:s:{stream_key}:{idx}", ] ) # map chapters if needed if self.ffmpeg_normalize.chapters_disable: cmd.extend(["-map_chapters", "-1"]) else: cmd.extend(["-map_chapters", "0"]) # collect all '-map' and codecs needed for output video based on input video if not self.ffmpeg_normalize.video_disable: for s in self.streams["video"].keys(): cmd.extend(["-map", f"0:{s}"]) # set codec (copy by default) cmd.extend(["-c:v", self.ffmpeg_normalize.video_codec]) # ... and map the output of the normalization filters for ol in output_labels: cmd.extend(["-map", ol]) # set audio codec (never copy) if self.ffmpeg_normalize.audio_codec: cmd.extend(["-c:a", self.ffmpeg_normalize.audio_codec]) else: for index, (_, audio_stream) in enumerate(self.streams["audio"].items()): cmd.extend([f"-c:a:{index}", audio_stream.get_pcm_codec()]) # other audio options (if any) if self.ffmpeg_normalize.audio_bitrate: cmd.extend(["-b:a", str(self.ffmpeg_normalize.audio_bitrate)]) if self.ffmpeg_normalize.sample_rate: cmd.extend(["-ar", str(self.ffmpeg_normalize.sample_rate)]) else: if self.ffmpeg_normalize.normalization_type == "ebu": logger.warn( "The sample rate will automatically be set to 192 kHz by the loudnorm filter. " "Specify -ar/--sample-rate to override it." ) # ... and subtitles if not self.ffmpeg_normalize.subtitle_disable: for s in self.streams["subtitle"].keys(): cmd.extend(["-map", f"0:{s}"]) # copy subtitles cmd.extend(["-c:s", "copy"]) if self.ffmpeg_normalize.keep_original_audio: highest_index = len(self.streams["audio"]) for index, (_, s) in enumerate(self.streams["audio"].items()): cmd.extend(["-map", f"0:a:{index}"]) cmd.extend([f"-c:a:{highest_index + index}", "copy"]) # extra options (if any) if self.ffmpeg_normalize.extra_output_options: cmd.extend(self.ffmpeg_normalize.extra_output_options) # output format (if any) if self.ffmpeg_normalize.output_format: cmd.extend(["-f", self.ffmpeg_normalize.output_format]) # if dry run, only show sample command if self.ffmpeg_normalize.dry_run: cmd.append(self.output_file) cmd_runner = CommandRunner(cmd, dry=True) cmd_runner.run_command() yield 100 return # create a temporary output file name temp_dir = tempfile.gettempdir() output_file_suffix = os.path.splitext(self.output_file)[1] temp_file_name = os.path.join( temp_dir, next(tempfile._get_candidate_names()) + output_file_suffix ) cmd.append(temp_file_name) # run the actual command try: cmd_runner = CommandRunner(cmd) try: for progress in cmd_runner.run_ffmpeg_command(): yield progress except Exception as e: logger.error( "Error while running command {}! Error: {}".format( " ".join([shlex.quote(c) for c in cmd]), e ) ) raise e else: # move file from TMP to output file logger.debug( f"Moving temporary file from {temp_file_name} to {self.output_file}" ) shutil.move(temp_file_name, self.output_file) except Exception as e: # remove dangling temporary file if os.path.isfile(temp_file_name): os.remove(temp_file_name) raise e logger.debug("Normalization finished")