import argparse |
import os |
import subprocess |
from datetime import datetime, timedelta |
from pathlib import Path |
from multiprocessing import Pool |
import pandas as pd |
from tqdm import tqdm |
from utils.logger import logger |
MIN_SECONDS = int(os.getenv("MIN_SECONDS", 3)) |
MAX_SECONDS = int(os.getenv("MAX_SECONDS", 10)) |
def get_command(start_time, video_path, video_duration, output_path): |
return [ |
'ffmpeg', |
'-hide_banner', |
'-loglevel', 'panic', |
'-ss', str(start_time.time()), |
'-i', video_path, |
'-t', str(video_duration), |
'-c:v', 'libx264', |
'-preset', 'veryfast', |
'-crf', '22', |
'-c:a', 'aac', |
'-sn', |
output_path |
] |
def clip_video_star(args): |
return clip_video(*args) |
def clip_video(video_path, timecode_list, output_folder, video_duration): |
"""Recursively clip the video within the range of [MIN_SECONDS, MAX_SECONDS], |
according to the timecode obtained from cogvideox/video_caption/cutscene_detect.py. |
""" |
try: |
video_name = Path(video_path).stem |
if len(timecode_list) == 0: |
splitted_timecode_list = [] |
start_time = datetime.strptime("00:00:00.000", "%H:%M:%S.%f") |
end_time = datetime.strptime(video_duration, "%H:%M:%S.%f") |
cur_start = start_time |
splitted_index = 0 |
while cur_start < end_time: |
cur_end = min(cur_start + timedelta(seconds=MAX_SECONDS), end_time) |
cur_video_duration = (cur_end - cur_start).total_seconds() |
if cur_video_duration < MIN_SECONDS: |
cur_start = cur_end |
splitted_index += 1 |
continue |
splitted_timecode_list.append([cur_start.strftime("%H:%M:%S.%f")[:-3], cur_end.strftime("%H:%M:%S.%f")[:-3]]) |
output_path = os.path.join(output_folder, video_name + f"_{splitted_index}.mp4") |
if os.path.exists(output_path): |
logger.info(f"The clipped video {output_path} exists.") |
cur_start = cur_end |
splitted_index += 1 |
continue |
else: |
command = get_command(cur_start, video_path, cur_video_duration, output_path) |
try: |
subprocess.run(command, check=True) |
except Exception as e: |
logger.warning(f"Run {command} error: {e}.") |
finally: |
cur_start = cur_end |
splitted_index += 1 |
for i, timecode in enumerate(timecode_list): |
start_time = datetime.strptime(timecode[0], "%H:%M:%S.%f") |
end_time = datetime.strptime(timecode[1], "%H:%M:%S.%f") |
video_duration = (end_time - start_time).total_seconds() |
output_path = os.path.join(output_folder, video_name + f"_{i}.mp4") |
if os.path.exists(output_path): |
logger.info(f"The clipped video {output_path} exists.") |
continue |
if video_duration < MIN_SECONDS: |
continue |
if video_duration > MAX_SECONDS: |
splitted_timecode_list = [] |
cur_start = start_time |
splitted_index = 0 |
while cur_start < end_time: |
cur_end = min(cur_start + timedelta(seconds=MAX_SECONDS), end_time) |
cur_video_duration = (cur_end - cur_start).total_seconds() |
if cur_video_duration < MIN_SECONDS: |
break |
splitted_timecode_list.append([cur_start.strftime("%H:%M:%S.%f")[:-3], cur_end.strftime("%H:%M:%S.%f")[:-3]]) |
splitted_output_path = os.path.join(output_folder, video_name + f"_{i}_{splitted_index}.mp4") |
if os.path.exists(splitted_output_path): |
logger.info(f"The clipped video {splitted_output_path} exists.") |
cur_start = cur_end |
splitted_index += 1 |
continue |
else: |
command = get_command(cur_start, video_path, cur_video_duration, splitted_output_path) |
try: |
subprocess.run(command, check=True) |
except Exception as e: |
logger.warning(f"Run {command} error: {e}.") |
finally: |
cur_start = cur_end |
splitted_index += 1 |
continue |
video_duration = video_duration - 0.5 |
command = get_command(start_time, video_path, video_duration, output_path) |
subprocess.run(command, check=True) |
except Exception as e: |
logger.warning(f"Clip video with {video_path}. Error is: {e}.") |
if __name__ == "__main__": |
parser = argparse.ArgumentParser(description="Video Splitting") |
parser.add_argument( |
"--video_metadata_path", type=str, default=None, help="The path to the video dataset metadata (csv/jsonl)." |
) |
parser.add_argument( |
"--video_path_column", |
type=str, |
default="video_path", |
help="The column contains the video path (an absolute path or a relative path w.r.t the video_folder).", |
) |
parser.add_argument("--video_folder", type=str, default="", help="The video folder.") |
parser.add_argument("--output_folder", type=str, default="outputs") |
parser.add_argument("--n_jobs", type=int, default=16) |
parser.add_argument("--resolution_threshold", type=float, default=0, help="The resolution threshold.") |
args = parser.parse_args() |
video_metadata_df = pd.read_json(args.video_metadata_path, lines=True) |
num_videos = len(video_metadata_df) |
video_metadata_df["resolution"] = video_metadata_df["frame_size"].apply(lambda x: x[0] * x[1]) |
video_metadata_df = video_metadata_df[video_metadata_df["resolution"] >= args.resolution_threshold] |
logger.info(f"Filter {num_videos - len(video_metadata_df)} videos with resolution smaller than {args.resolution_threshold}.") |
video_path_list = video_metadata_df[args.video_path_column].to_list() |
video_id_list = [Path(video_path).stem for video_path in video_path_list] |
if len(video_id_list) != len(list(set(video_id_list))): |
logger.warning("Duplicate file names exist in the input video path list.") |
video_path_list = [os.path.join(args.video_folder, video_path) for video_path in video_path_list] |
video_timecode_list = video_metadata_df["timecode_list"].to_list() |
video_duration_list = video_metadata_df["duration"].to_list() |
assert len(video_path_list) == len(video_timecode_list) |
os.makedirs(args.output_folder, exist_ok=True) |
args_list = [ |
(video_path, timecode_list, args.output_folder, video_duration) |
for video_path, timecode_list, video_duration in zip( |
video_path_list, video_timecode_list, video_duration_list |
) |
] |
with Pool(args.n_jobs) as pool: |
results = pool.imap(clip_video_star, args_list) |
for result in tqdm(results, total=len(video_path_list)): |
pass |