Pendrokar's picture
relocate folders
ed18ebf
raw
history blame
13.5 kB
import os
import re
import tempfile
import shutil
from tqdm import tqdm
import shlex
from ._streams import AudioStream, VideoStream, SubtitleStream
from ._errors import FFmpegNormalizeError
from ._cmd_utils import NUL, CommandRunner, DUR_REGEX, to_ms
from ._logger import setup_custom_logger
logger = setup_custom_logger("ffmpeg_normalize")
class MediaFile:
"""
Class that holds a file, its streams and adjustments
"""
def __init__(self, ffmpeg_normalize, input_file, output_file=None):
"""
Initialize a media file for later normalization.
Arguments:
ffmpeg_normalize {FFmpegNormalize} -- reference to overall settings
input_file {str} -- Path to input file
Keyword Arguments:
output_file {str} -- Path to output file (default: {None})
"""
self.ffmpeg_normalize = ffmpeg_normalize
self.skip = False
self.input_file = input_file
self.output_file = output_file
self.streams = {"audio": {}, "video": {}, "subtitle": {}}
self.parse_streams()
def _stream_ids(self):
return (
list(self.streams["audio"].keys())
+ list(self.streams["video"].keys())
+ list(self.streams["subtitle"].keys())
)
def __repr__(self):
return os.path.basename(self.input_file)
def parse_streams(self):
"""
Try to parse all input streams from file
"""
logger.debug(f"Parsing streams of {self.input_file}")
cmd = [
self.ffmpeg_normalize.ffmpeg_exe,
"-i",
self.input_file,
"-c",
"copy",
"-t",
"0",
"-map",
"0",
"-f",
"null",
NUL,
]
cmd_runner = CommandRunner(cmd)
cmd_runner.run_command()
output = cmd_runner.get_output()
logger.debug("Stream parsing command output:")
logger.debug(output)
output_lines = [line.strip() for line in output.split("\n")]
duration = None
for line in output_lines:
if "Duration" in line:
duration_search = DUR_REGEX.search(line)
if not duration_search:
logger.warning("Could not extract duration from input file!")
else:
duration = duration_search.groupdict()
duration = to_ms(**duration) / 1000
logger.debug("Found duration: " + str(duration) + " s")
if not line.startswith("Stream"):
continue
stream_id_match = re.search(r"#0:([\d]+)", line)
if stream_id_match:
stream_id = int(stream_id_match.group(1))
if stream_id in self._stream_ids():
continue
else:
continue
if "Audio" in line:
logger.debug(f"Found audio stream at index {stream_id}")
sample_rate_match = re.search(r"(\d+) Hz", line)
sample_rate = (
int(sample_rate_match.group(1)) if sample_rate_match else None
)
bit_depth_match = re.search(r"s(\d+)p?,", line)
bit_depth = int(bit_depth_match.group(1)) if bit_depth_match else None
self.streams["audio"][stream_id] = AudioStream(
self,
self.ffmpeg_normalize,
stream_id,
sample_rate,
bit_depth,
duration,
)
elif "Video" in line:
logger.debug(f"Found video stream at index {stream_id}")
self.streams["video"][stream_id] = VideoStream(
self, self.ffmpeg_normalize, stream_id
)
elif "Subtitle" in line:
logger.debug(f"Found subtitle stream at index {stream_id}")
self.streams["subtitle"][stream_id] = SubtitleStream(
self, self.ffmpeg_normalize, stream_id
)
if not self.streams["audio"]:
raise FFmpegNormalizeError(
f"Input file {self.input_file} does not contain any audio streams"
)
if (
os.path.splitext(self.output_file)[1].lower() in [".wav", ".mp3", ".aac"]
and len(self.streams["audio"].values()) > 1
):
logger.warning(
"Output file only supports one stream. "
"Keeping only first audio stream."
)
first_stream = list(self.streams["audio"].values())[0]
self.streams["audio"] = {first_stream.stream_id: first_stream}
self.streams["video"] = {}
self.streams["subtitle"] = {}
def run_normalization(self):
logger.debug(f"Running normalization for {self.input_file}")
# run the first pass to get loudness stats
self._first_pass()
# run the second pass as a whole
if self.ffmpeg_normalize.progress:
with tqdm(total=100, position=1, desc="Second Pass") as pbar:
for progress in self._second_pass():
pbar.update(progress - pbar.n)
else:
for _ in self._second_pass():
pass
def _first_pass(self):
logger.debug(f"Parsing normalization info for {self.input_file}")
for index, audio_stream in enumerate(self.streams["audio"].values()):
if self.ffmpeg_normalize.normalization_type == "ebu":
fun = getattr(audio_stream, "parse_loudnorm_stats")
else:
fun = getattr(audio_stream, "parse_volumedetect_stats")
if self.ffmpeg_normalize.progress:
with tqdm(
total=100,
position=1,
desc=f"Stream {index + 1}/{len(self.streams['audio'].values())}",
) as pbar:
for progress in fun():
pbar.update(progress - pbar.n)
else:
for _ in fun():
pass
if self.ffmpeg_normalize.print_stats:
stats = [
audio_stream.get_stats()
for audio_stream in self.streams["audio"].values()
]
self.ffmpeg_normalize.stats.extend(stats)
def _get_audio_filter_cmd(self):
"""
Return filter_complex command and output labels needed
"""
filter_chains = []
output_labels = []
for audio_stream in self.streams["audio"].values():
if self.ffmpeg_normalize.normalization_type == "ebu":
normalization_filter = audio_stream.get_second_pass_opts_ebu()
else:
normalization_filter = audio_stream.get_second_pass_opts_peakrms()
input_label = f"[0:{audio_stream.stream_id}]"
output_label = f"[norm{audio_stream.stream_id}]"
output_labels.append(output_label)
filter_chain = []
if self.ffmpeg_normalize.pre_filter:
filter_chain.append(self.ffmpeg_normalize.pre_filter)
filter_chain.append(normalization_filter)
if self.ffmpeg_normalize.post_filter:
filter_chain.append(self.ffmpeg_normalize.post_filter)
filter_chains.append(input_label + ",".join(filter_chain) + output_label)
filter_complex_cmd = ";".join(filter_chains)
return filter_complex_cmd, output_labels
def _second_pass(self):
"""
Construct the second pass command and run it
FIXME: make this method simpler
"""
logger.info(f"Running second pass for {self.input_file}")
# get the target output stream types depending on the options
output_stream_types = ["audio"]
if not self.ffmpeg_normalize.video_disable:
output_stream_types.append("video")
if not self.ffmpeg_normalize.subtitle_disable:
output_stream_types.append("subtitle")
# base command, here we will add all other options
cmd = [self.ffmpeg_normalize.ffmpeg_exe, "-y", "-nostdin"]
# extra options (if any)
if self.ffmpeg_normalize.extra_input_options:
cmd.extend(self.ffmpeg_normalize.extra_input_options)
# get complex filter command
audio_filter_cmd, output_labels = self._get_audio_filter_cmd()
# add input file and basic filter
cmd.extend(["-i", self.input_file, "-filter_complex", audio_filter_cmd])
# map metadata, only if needed
if self.ffmpeg_normalize.metadata_disable:
cmd.extend(["-map_metadata", "-1"])
else:
# map global metadata
cmd.extend(["-map_metadata", "0"])
# map per-stream metadata (e.g. language tags)
for stream_type in output_stream_types:
stream_key = stream_type[0]
if stream_type not in self.streams:
continue
for idx, _ in enumerate(self.streams[stream_type].items()):
cmd.extend(
[
f"-map_metadata:s:{stream_key}:{idx}",
f"0:s:{stream_key}:{idx}",
]
)
# map chapters if needed
if self.ffmpeg_normalize.chapters_disable:
cmd.extend(["-map_chapters", "-1"])
else:
cmd.extend(["-map_chapters", "0"])
# collect all '-map' and codecs needed for output video based on input video
if not self.ffmpeg_normalize.video_disable:
for s in self.streams["video"].keys():
cmd.extend(["-map", f"0:{s}"])
# set codec (copy by default)
cmd.extend(["-c:v", self.ffmpeg_normalize.video_codec])
# ... and map the output of the normalization filters
for ol in output_labels:
cmd.extend(["-map", ol])
# set audio codec (never copy)
if self.ffmpeg_normalize.audio_codec:
cmd.extend(["-c:a", self.ffmpeg_normalize.audio_codec])
else:
for index, (_, audio_stream) in enumerate(self.streams["audio"].items()):
cmd.extend([f"-c:a:{index}", audio_stream.get_pcm_codec()])
# other audio options (if any)
if self.ffmpeg_normalize.audio_bitrate:
cmd.extend(["-b:a", str(self.ffmpeg_normalize.audio_bitrate)])
if self.ffmpeg_normalize.sample_rate:
cmd.extend(["-ar", str(self.ffmpeg_normalize.sample_rate)])
else:
if self.ffmpeg_normalize.normalization_type == "ebu":
logger.warn(
"The sample rate will automatically be set to 192 kHz by the loudnorm filter. "
"Specify -ar/--sample-rate to override it."
)
# ... and subtitles
if not self.ffmpeg_normalize.subtitle_disable:
for s in self.streams["subtitle"].keys():
cmd.extend(["-map", f"0:{s}"])
# copy subtitles
cmd.extend(["-c:s", "copy"])
if self.ffmpeg_normalize.keep_original_audio:
highest_index = len(self.streams["audio"])
for index, (_, s) in enumerate(self.streams["audio"].items()):
cmd.extend(["-map", f"0:a:{index}"])
cmd.extend([f"-c:a:{highest_index + index}", "copy"])
# extra options (if any)
if self.ffmpeg_normalize.extra_output_options:
cmd.extend(self.ffmpeg_normalize.extra_output_options)
# output format (if any)
if self.ffmpeg_normalize.output_format:
cmd.extend(["-f", self.ffmpeg_normalize.output_format])
# if dry run, only show sample command
if self.ffmpeg_normalize.dry_run:
cmd.append(self.output_file)
cmd_runner = CommandRunner(cmd, dry=True)
cmd_runner.run_command()
yield 100
return
# create a temporary output file name
temp_dir = tempfile.gettempdir()
output_file_suffix = os.path.splitext(self.output_file)[1]
temp_file_name = os.path.join(
temp_dir, next(tempfile._get_candidate_names()) + output_file_suffix
)
cmd.append(temp_file_name)
# run the actual command
try:
cmd_runner = CommandRunner(cmd)
try:
for progress in cmd_runner.run_ffmpeg_command():
yield progress
except Exception as e:
logger.error(
"Error while running command {}! Error: {}".format(
" ".join([shlex.quote(c) for c in cmd]), e
)
)
raise e
else:
# move file from TMP to output file
logger.debug(
f"Moving temporary file from {temp_file_name} to {self.output_file}"
)
shutil.move(temp_file_name, self.output_file)
except Exception as e:
# remove dangling temporary file
if os.path.isfile(temp_file_name):
os.remove(temp_file_name)
raise e
logger.debug("Normalization finished")