vtesting2 / cogvideox /video_caption /cutscene_detect.py
meepmoo's picture
Upload folder using huggingface_hub
0dcccdd verified
raw
history blame
4.35 kB
import argparse
import os
from copy import deepcopy
from pathlib import Path
from multiprocessing import Pool
import pandas as pd
from scenedetect import open_video, SceneManager
from scenedetect.detectors import ContentDetector
from tqdm import tqdm
from utils.logger import logger
def cutscene_detection_star(args):
return cutscene_detection(*args)
def cutscene_detection(video_path, saved_path, cutscene_threshold=27, min_scene_len=15):
try:
if os.path.exists(saved_path):
logger.info(f"{video_path} has been processed.")
return
# Use PyAV as the backend to avoid (to some exent) containing the last frame of the previous scene.
# https://github.com/Breakthrough/PySceneDetect/issues/279#issuecomment-2152596761.
video = open_video(video_path, backend="pyav")
frame_rate, frame_size = video.frame_rate, video.frame_size
duration = deepcopy(video.duration)
frame_points, frame_timecode = [], {}
scene_manager = SceneManager()
scene_manager.add_detector(
# [ContentDetector, ThresholdDetector, AdaptiveDetector]
ContentDetector(threshold=cutscene_threshold, min_scene_len=min_scene_len)
)
scene_manager.detect_scenes(video, show_progress=False)
scene_list = scene_manager.get_scene_list()
for scene in scene_list:
for frame_time_code in scene:
frame_index = frame_time_code.get_frames()
if frame_index not in frame_points:
frame_points.append(frame_index)
frame_timecode[frame_index] = frame_time_code
del video, scene_manager
frame_points = sorted(frame_points)
output_scene_list = []
for idx in range(len(frame_points) - 1):
output_scene_list.append((frame_timecode[frame_points[idx]], frame_timecode[frame_points[idx+1]]))
timecode_list = [(frame_timecode_tuple[0].get_timecode(), frame_timecode_tuple[1].get_timecode()) for frame_timecode_tuple in output_scene_list]
meta_scene = [{
"video_path": Path(video_path).name,
"timecode_list": timecode_list,
"fram_rate": frame_rate,
"frame_size": frame_size,
"duration": str(duration) # __repr__
}]
pd.DataFrame(meta_scene).to_json(saved_path, orient="records", lines=True)
except Exception as e:
logger.warning(f"Cutscene detection with {video_path} failed. Error is: {e}.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Cutscene Detection")
parser.add_argument(
"--video_metadata_path", type=str, required=True, help="The path to the video dataset metadata (csv/jsonl)."
)
parser.add_argument(
"--video_path_column",
type=str,
default="video_path",
help="The column contains the video path (an absolute path or a relative path w.r.t the video_folder).",
)
parser.add_argument("--video_folder", type=str, default="", help="The video folder.")
parser.add_argument("--saved_folder", type=str, required=True, help="The save path to the output results (csv/jsonl).")
parser.add_argument("--n_jobs", type=int, default=1, help="The number of processes.")
args = parser.parse_args()
metadata_df = pd.read_json(args.video_metadata_path, lines=True)
video_path_list = metadata_df[args.video_path_column].tolist()
video_path_list = [os.path.join(args.video_folder, video_path) for video_path in video_path_list]
if not os.path.exists(args.saved_folder):
os.makedirs(args.saved_folder, exist_ok=True)
# The glob can be slow when there are many small jsonl files.
saved_path_list = [os.path.join(args.saved_folder, Path(video_path).stem + ".jsonl") for video_path in video_path_list]
args_list = [
(video_path, saved_path)
for video_path, saved_path in zip(video_path_list, saved_path_list)
]
# Since the length of the video is not uniform, the gather operation is not performed.
# We need to run easyanimate/video_caption/utils/gather_jsonl.py after the program finised.
with Pool(args.n_jobs) as pool:
results = list(tqdm(pool.imap(cutscene_detection_star, args_list), total=len(video_path_list)))