Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
File size: 5,293 Bytes
19c8b95 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
from __future__ import division
import os
import sys
import subprocess
from platform import system as _current_os
import re
from ffmpeg_progress_yield import FfmpegProgress
from ._errors import FFmpegNormalizeError
from ._logger import setup_custom_logger
logger = setup_custom_logger("ffmpeg_normalize")
CUR_OS = _current_os()
IS_WIN = CUR_OS in ["Windows", "cli"]
IS_NIX = (not IS_WIN) and any(
CUR_OS.startswith(i)
for i in ["CYGWIN", "MSYS", "Linux", "Darwin", "SunOS", "FreeBSD", "NetBSD"]
)
NUL = "NUL" if IS_WIN else "/dev/null"
DUR_REGEX = re.compile(
r"Duration: (?P<hour>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2})\.(?P<ms>\d{2})"
)
# https://gist.github.com/Hellowlol/5f8545e999259b4371c91ac223409209
def to_ms(s=None, des=None, **kwargs):
if s:
hour = int(s[0:2])
minute = int(s[3:5])
sec = int(s[6:8])
ms = int(s[10:11])
else:
hour = int(kwargs.get("hour", 0))
minute = int(kwargs.get("min", 0))
sec = int(kwargs.get("sec", 0))
ms = int(kwargs.get("ms"))
result = (hour * 60 * 60 * 1000) + (minute * 60 * 1000) + (sec * 1000) + ms
if des and isinstance(des, int):
return round(result, des)
return result
class CommandRunner:
def __init__(self, cmd, dry=False):
self.cmd = cmd
self.dry = dry
self.output = None
def run_ffmpeg_command(self):
# wrapper for 'ffmpeg-progress-yield'
ff = FfmpegProgress(self.cmd, dry_run=self.dry)
for progress in ff.run_command_with_progress():
yield progress
self.output = ff.stderr
def run_command(self):
logger.debug(f"Running command: {self.cmd}")
if self.dry:
logger.debug("Dry mode specified, not actually running command")
return
p = subprocess.Popen(
self.cmd,
stdin=subprocess.PIPE, # Apply stdin isolation by creating separate pipe.
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=False,
)
# simple running of command
stdout, stderr = p.communicate()
stdout = stdout.decode("utf8", errors="replace")
stderr = stderr.decode("utf8", errors="replace")
if p.returncode == 0:
self.output = stdout + stderr
else:
raise RuntimeError(
f"Error running command {self.cmd}: {str(stderr)}"
)
def get_output(self):
return self.output
def which(program):
"""
Find a program in PATH and return path
From: http://stackoverflow.com/q/377017/
"""
def is_exe(fpath):
found = os.path.isfile(fpath) and os.access(fpath, os.X_OK)
if not found and sys.platform == "win32":
fpath = fpath + ".exe"
found = os.path.isfile(fpath) and os.access(fpath, os.X_OK)
return found
fpath, _ = os.path.split(program)
if fpath:
if is_exe(program):
logger.debug("found executable: " + str(program))
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
path = os.path.expandvars(os.path.expanduser(path)).strip('"')
exe_file = os.path.join(path, program)
if is_exe(exe_file):
logger.debug("found executable in path: " + str(exe_file))
return exe_file
return None
def dict_to_filter_opts(opts):
filter_opts = []
for k, v in opts.items():
filter_opts.append(f"{k}={v}")
return ":".join(filter_opts)
# def get_ffmpeg_exe():
# """
# Return path to ffmpeg executable
# """
# ffmpeg_path = os.getenv("FFMPEG_PATH")
# if ffmpeg_path:
# if os.sep in ffmpeg_path:
# ffmpeg_exe = ffmpeg_path
# if not os.path.isfile(ffmpeg_exe):
# raise FFmpegNormalizeError(f"No file exists at {ffmpeg_exe}")
# else:
# ffmpeg_exe = which(ffmpeg_path)
# if not ffmpeg_exe:
# raise FFmpegNormalizeError(
# f"Could not find '{ffmpeg_path}' in your $PATH."
# )
# else:
# ffmpeg_exe = which("ffmpeg")
# if not ffmpeg_exe:
# if which("avconv"):
# raise FFmpegNormalizeError(
# "avconv is not supported. "
# "Please install ffmpeg from http://ffmpeg.org instead."
# )
# else:
# raise FFmpegNormalizeError(
# "Could not find ffmpeg in your $PATH or $FFMPEG_PATH. "
# "Please install ffmpeg from http://ffmpeg.org"
# )
# return ffmpeg_exe
def ffmpeg_has_loudnorm(ffmpeg_path):
"""
Run feature detection on ffmpeg, returns True if ffmpeg supports
the loudnorm filter
"""
# cmd_runner = CommandRunner([get_ffmpeg_exe(), "-filters"])
cmd_runner = CommandRunner([ffmpeg_path, "-filters"])
cmd_runner.run_command()
output = cmd_runner.get_output()
if "loudnorm" in output:
return True
else:
logger.error(
"Your ffmpeg version does not support the 'loudnorm' filter. "
"Please make sure you are running ffmpeg v3.1 or above."
)
return False
|