File size: 4,345 Bytes
208b0eb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
import argparse
import os
from copy import deepcopy
from pathlib import Path
from multiprocessing import Pool
import pandas as pd
from scenedetect import open_video, SceneManager
from scenedetect.detectors import ContentDetector
from tqdm import tqdm
from utils.logger import logger
def cutscene_detection_star(args):
return cutscene_detection(*args)
def cutscene_detection(video_path, saved_path, cutscene_threshold=27, min_scene_len=15):
try:
if os.path.exists(saved_path):
logger.info(f"{video_path} has been processed.")
return
# Use PyAV as the backend to avoid (to some exent) containing the last frame of the previous scene.
# https://github.com/Breakthrough/PySceneDetect/issues/279#issuecomment-2152596761.
video = open_video(video_path, backend="pyav")
frame_rate, frame_size = video.frame_rate, video.frame_size
duration = deepcopy(video.duration)
frame_points, frame_timecode = [], {}
scene_manager = SceneManager()
scene_manager.add_detector(
# [ContentDetector, ThresholdDetector, AdaptiveDetector]
ContentDetector(threshold=cutscene_threshold, min_scene_len=min_scene_len)
)
scene_manager.detect_scenes(video, show_progress=False)
scene_list = scene_manager.get_scene_list()
for scene in scene_list:
for frame_time_code in scene:
frame_index = frame_time_code.get_frames()
if frame_index not in frame_points:
frame_points.append(frame_index)
frame_timecode[frame_index] = frame_time_code
del video, scene_manager
frame_points = sorted(frame_points)
output_scene_list = []
for idx in range(len(frame_points) - 1):
output_scene_list.append((frame_timecode[frame_points[idx]], frame_timecode[frame_points[idx+1]]))
timecode_list = [(frame_timecode_tuple[0].get_timecode(), frame_timecode_tuple[1].get_timecode()) for frame_timecode_tuple in output_scene_list]
meta_scene = [{
"video_path": Path(video_path).name,
"timecode_list": timecode_list,
"fram_rate": frame_rate,
"frame_size": frame_size,
"duration": str(duration) # __repr__
}]
pd.DataFrame(meta_scene).to_json(saved_path, orient="records", lines=True)
except Exception as e:
logger.warning(f"Cutscene detection with {video_path} failed. Error is: {e}.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Cutscene Detection")
parser.add_argument(
"--video_metadata_path", type=str, required=True, help="The path to the video dataset metadata (csv/jsonl)."
)
parser.add_argument(
"--video_path_column",
type=str,
default="video_path",
help="The column contains the video path (an absolute path or a relative path w.r.t the video_folder).",
)
parser.add_argument("--video_folder", type=str, default="", help="The video folder.")
parser.add_argument("--saved_folder", type=str, required=True, help="The save path to the output results (csv/jsonl).")
parser.add_argument("--n_jobs", type=int, default=1, help="The number of processes.")
args = parser.parse_args()
metadata_df = pd.read_json(args.video_metadata_path, lines=True)
video_path_list = metadata_df[args.video_path_column].tolist()
video_path_list = [os.path.join(args.video_folder, video_path) for video_path in video_path_list]
if not os.path.exists(args.saved_folder):
os.makedirs(args.saved_folder, exist_ok=True)
# The glob can be slow when there are many small jsonl files.
saved_path_list = [os.path.join(args.saved_folder, Path(video_path).stem + ".jsonl") for video_path in video_path_list]
args_list = [
(video_path, saved_path)
for video_path, saved_path in zip(video_path_list, saved_path_list)
]
# Since the length of the video is not uniform, the gather operation is not performed.
# We need to run easyanimate/video_caption/utils/gather_jsonl.py after the program finised.
with Pool(args.n_jobs) as pool:
results = list(tqdm(pool.imap(cutscene_detection_star, args_list), total=len(video_path_list)))
|