vtesting93x / cogvideox /video_caption /video_splitting.py
meepmoo's picture
Upload folder using huggingface_hub
208b0eb verified
import argparse
import os
import subprocess
from datetime import datetime, timedelta
from pathlib import Path
from multiprocessing import Pool
import pandas as pd
from tqdm import tqdm
from utils.logger import logger
MIN_SECONDS = int(os.getenv("MIN_SECONDS", 3))
MAX_SECONDS = int(os.getenv("MAX_SECONDS", 10))
def get_command(start_time, video_path, video_duration, output_path):
# Use FFmpeg to split the video. Re-encoding is needed to ensure the accuracy of the clip
# at the cost of consuming computational resources.
return [
'ffmpeg',
'-hide_banner',
'-loglevel', 'panic',
'-ss', str(start_time.time()),
'-i', video_path,
'-t', str(video_duration),
'-c:v', 'libx264',
'-preset', 'veryfast',
'-crf', '22',
'-c:a', 'aac',
'-sn',
output_path
]
def clip_video_star(args):
return clip_video(*args)
def clip_video(video_path, timecode_list, output_folder, video_duration):
"""Recursively clip the video within the range of [MIN_SECONDS, MAX_SECONDS],
according to the timecode obtained from cogvideox/video_caption/cutscene_detect.py.
"""
try:
video_name = Path(video_path).stem
if len(timecode_list) == 0: # The video of a single scene.
splitted_timecode_list = []
start_time = datetime.strptime("00:00:00.000", "%H:%M:%S.%f")
end_time = datetime.strptime(video_duration, "%H:%M:%S.%f")
cur_start = start_time
splitted_index = 0
while cur_start < end_time:
cur_end = min(cur_start + timedelta(seconds=MAX_SECONDS), end_time)
cur_video_duration = (cur_end - cur_start).total_seconds()
if cur_video_duration < MIN_SECONDS:
cur_start = cur_end
splitted_index += 1
continue
splitted_timecode_list.append([cur_start.strftime("%H:%M:%S.%f")[:-3], cur_end.strftime("%H:%M:%S.%f")[:-3]])
output_path = os.path.join(output_folder, video_name + f"_{splitted_index}.mp4")
if os.path.exists(output_path):
logger.info(f"The clipped video {output_path} exists.")
cur_start = cur_end
splitted_index += 1
continue
else:
command = get_command(cur_start, video_path, cur_video_duration, output_path)
try:
subprocess.run(command, check=True)
except Exception as e:
logger.warning(f"Run {command} error: {e}.")
finally:
cur_start = cur_end
splitted_index += 1
for i, timecode in enumerate(timecode_list): # The video of multiple scenes.
start_time = datetime.strptime(timecode[0], "%H:%M:%S.%f")
end_time = datetime.strptime(timecode[1], "%H:%M:%S.%f")
video_duration = (end_time - start_time).total_seconds()
output_path = os.path.join(output_folder, video_name + f"_{i}.mp4")
if os.path.exists(output_path):
logger.info(f"The clipped video {output_path} exists.")
continue
if video_duration < MIN_SECONDS:
continue
if video_duration > MAX_SECONDS:
splitted_timecode_list = []
cur_start = start_time
splitted_index = 0
while cur_start < end_time:
cur_end = min(cur_start + timedelta(seconds=MAX_SECONDS), end_time)
cur_video_duration = (cur_end - cur_start).total_seconds()
if cur_video_duration < MIN_SECONDS:
break
splitted_timecode_list.append([cur_start.strftime("%H:%M:%S.%f")[:-3], cur_end.strftime("%H:%M:%S.%f")[:-3]])
splitted_output_path = os.path.join(output_folder, video_name + f"_{i}_{splitted_index}.mp4")
if os.path.exists(splitted_output_path):
logger.info(f"The clipped video {splitted_output_path} exists.")
cur_start = cur_end
splitted_index += 1
continue
else:
command = get_command(cur_start, video_path, cur_video_duration, splitted_output_path)
try:
subprocess.run(command, check=True)
except Exception as e:
logger.warning(f"Run {command} error: {e}.")
finally:
cur_start = cur_end
splitted_index += 1
continue
# We found that the current scene detected by PySceneDetect includes a few frames from
# the next scene occasionally. Directly discard the last few frames of the current scene.
video_duration = video_duration - 0.5
command = get_command(start_time, video_path, video_duration, output_path)
subprocess.run(command, check=True)
except Exception as e:
logger.warning(f"Clip video with {video_path}. Error is: {e}.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Video Splitting")
parser.add_argument(
"--video_metadata_path", type=str, default=None, help="The path to the video dataset metadata (csv/jsonl)."
)
parser.add_argument(
"--video_path_column",
type=str,
default="video_path",
help="The column contains the video path (an absolute path or a relative path w.r.t the video_folder).",
)
parser.add_argument("--video_folder", type=str, default="", help="The video folder.")
parser.add_argument("--output_folder", type=str, default="outputs")
parser.add_argument("--n_jobs", type=int, default=16)
parser.add_argument("--resolution_threshold", type=float, default=0, help="The resolution threshold.")
args = parser.parse_args()
video_metadata_df = pd.read_json(args.video_metadata_path, lines=True)
num_videos = len(video_metadata_df)
video_metadata_df["resolution"] = video_metadata_df["frame_size"].apply(lambda x: x[0] * x[1])
video_metadata_df = video_metadata_df[video_metadata_df["resolution"] >= args.resolution_threshold]
logger.info(f"Filter {num_videos - len(video_metadata_df)} videos with resolution smaller than {args.resolution_threshold}.")
video_path_list = video_metadata_df[args.video_path_column].to_list()
video_id_list = [Path(video_path).stem for video_path in video_path_list]
if len(video_id_list) != len(list(set(video_id_list))):
logger.warning("Duplicate file names exist in the input video path list.")
video_path_list = [os.path.join(args.video_folder, video_path) for video_path in video_path_list]
video_timecode_list = video_metadata_df["timecode_list"].to_list()
video_duration_list = video_metadata_df["duration"].to_list()
assert len(video_path_list) == len(video_timecode_list)
os.makedirs(args.output_folder, exist_ok=True)
args_list = [
(video_path, timecode_list, args.output_folder, video_duration)
for video_path, timecode_list, video_duration in zip(
video_path_list, video_timecode_list, video_duration_list
)
]
with Pool(args.n_jobs) as pool:
# results = list(tqdm(pool.imap(clip_video_star, args_list), total=len(video_path_list)))
results = pool.imap(clip_video_star, args_list)
for result in tqdm(results, total=len(video_path_list)):
pass