#!/usr/bin/python import sys, os, argparse, pickle, subprocess, cv2, math import numpy as np from shutil import rmtree, copy, copytree from tqdm import tqdm import scenedetect from scenedetect.video_manager import VideoManager from scenedetect.scene_manager import SceneManager from scenedetect.stats_manager import StatsManager from scenedetect.detectors import ContentDetector from scipy.interpolate import interp1d from scipy import signal from ultralytics import YOLO from decord import VideoReader import spaces parser = argparse.ArgumentParser(description="FaceTracker") parser.add_argument('--data_dir', type=str, help='directory to save intermediate temp results') parser.add_argument('--facedet_scale', type=float, default=0.25, help='Scale factor for face detection') parser.add_argument('--crop_scale', type=float, default=0, help='Scale bounding box') parser.add_argument('--min_track', type=int, default=50, help='Minimum facetrack duration') parser.add_argument('--frame_rate', type=int, default=25, help='Frame rate') parser.add_argument('--num_failed_det', type=int, default=25, help='Number of missed detections allowed before tracking is stopped') parser.add_argument('--min_frame_size', type=int, default=64, help='Minimum frame size in pixels') parser.add_argument('--sd_root', type=str, required=True, help='Path to save crops') parser.add_argument('--work_root', type=str, required=True, help='Path to save metadata files') parser.add_argument('--data_root', type=str, required=True, help='Directory containing ONLY full uncropped videos') opt = parser.parse_args() def bb_intersection_over_union(boxA, boxB): xA = max(boxA[0], boxB[0]) yA = max(boxA[1], boxB[1]) xB = min(boxA[2], boxB[2]) yB = min(boxB[3], boxB[3]) interArea = max(0, xB - xA) * max(0, yB - yA) boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1]) boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1]) iou = interArea / float(boxAArea + boxBArea - interArea) return iou def track_shot(opt, scenefaces): print("Tracking video...") iouThres = 0.5 # Minimum IOU between consecutive face detections tracks = [] while True: track = [] for framefaces in scenefaces: for face in framefaces: if track == []: track.append(face) framefaces.remove(face) elif face['frame'] - track[-1]['frame'] <= opt.num_failed_det: iou = bb_intersection_over_union(face['bbox'], track[-1]['bbox']) if iou > iouThres: track.append(face) framefaces.remove(face) continue else: break if track == []: break elif len(track) > opt.min_track: framenum = np.array([f['frame'] for f in track]) bboxes = np.array([np.array(f['bbox']) for f in track]) frame_i = np.arange(framenum[0], framenum[-1] + 1) bboxes_i = [] for ij in range(0, 4): interpfn = interp1d(framenum, bboxes[:, ij]) bboxes_i.append(interpfn(frame_i)) bboxes_i = np.stack(bboxes_i, axis=1) if max(np.mean(bboxes_i[:, 2] - bboxes_i[:, 0]), np.mean(bboxes_i[:, 3] - bboxes_i[:, 1])) > opt.min_frame_size: tracks.append({'frame': frame_i, 'bbox': bboxes_i}) return tracks def check_folder(folder): if os.path.exists(folder): return True return False def del_folder(folder): if os.path.exists(folder): rmtree(folder) def read_video(o, start_idx): with open(o, 'rb') as o: video_stream = VideoReader(o) if start_idx > 0: video_stream.skip_frames(start_idx) return video_stream def crop_video(opt, track, cropfile, tight_scale=1): print("Cropping video...") fourcc = cv2.VideoWriter_fourcc(*'XVID') vOut = cv2.VideoWriter(cropfile + '.avi', fourcc, opt.frame_rate, (480, 270)) dets = {'x': [], 'y': [], 's': [], 'bbox': track['bbox'], 'frame': track['frame']} for det in track['bbox']: # Reduce the size of the bounding box by a small factor if tighter crops are needed (default -> no reduction in size) width = (det[2] - det[0]) * tight_scale height = (det[3] - det[1]) * tight_scale center_x = (det[0] + det[2]) / 2 center_y = (det[1] + det[3]) / 2 dets['s'].append(max(height, width) / 2) dets['y'].append(center_y) # crop center y dets['x'].append(center_x) # crop center x # Smooth detections dets['s'] = signal.medfilt(dets['s'], kernel_size=13) dets['x'] = signal.medfilt(dets['x'], kernel_size=13) dets['y'] = signal.medfilt(dets['y'], kernel_size=13) videofile = os.path.join(opt.avi_dir, 'video.avi') frame_no_to_start = track['frame'][0] video_stream = cv2.VideoCapture(videofile) video_stream.set(cv2.CAP_PROP_POS_FRAMES, frame_no_to_start) for fidx, frame in enumerate(track['frame']): cs = opt.crop_scale bs = dets['s'][fidx] # Detection box size bsi = int(bs * (1 + 2 * cs)) # Pad videos by this amount image = video_stream.read()[1] frame = np.pad(image, ((bsi, bsi), (bsi, bsi), (0, 0)), 'constant', constant_values=(110, 110)) my = dets['y'][fidx] + bsi # BBox center Y mx = dets['x'][fidx] + bsi # BBox center X face = frame[int(my - bs):int(my + bs * (1 + 2 * cs)), int(mx - bs * (1 + cs)):int(mx + bs * (1 + cs))] vOut.write(cv2.resize(face, (480, 270))) video_stream.release() audiotmp = os.path.join(opt.tmp_dir, 'audio.wav') audiostart = (track['frame'][0]) / opt.frame_rate audioend = (track['frame'][-1] + 1) / opt.frame_rate vOut.release() # ========== CROP AUDIO FILE ========== command = ("ffmpeg -hide_banner -loglevel panic -y -i %s -ss %.3f -to %.3f %s" % (os.path.join(opt.avi_dir, 'audio.wav'), audiostart, audioend, audiotmp)) output = subprocess.call(command, shell=True, stdout=None) copy(audiotmp, cropfile + '.wav') # print('Written %s' % cropfile) # print('Mean pos: x %.2f y %.2f s %.2f' % (np.mean(dets['x']), np.mean(dets['y']), np.mean(dets['s']))) return {'track': track, 'proc_track': dets} @spaces.GPU(duration=200) def inference_video(opt, padding=0): videofile = os.path.join(opt.avi_dir, 'video.avi') vidObj = cv2.VideoCapture(videofile) yolo_model = YOLO("yolov9m.pt") global dets, fidx dets = [] fidx = 0 print("Detecting people in the video using YOLO (slowest step in the pipeline)...") def generate_detections(): global dets, fidx while True: success, image = vidObj.read() if not success: break image_np = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # Perform person detection results = yolo_model(image_np, verbose=False) detections = results[0].boxes dets.append([]) for i, det in enumerate(detections): x1, y1, x2, y2 = det.xyxy[0].detach().cpu().numpy() cls = det.cls[0].detach().cpu().numpy() conf = det.conf[0].detach().cpu().numpy() if int(cls) == 0 and conf>0.7: # Class 0 is 'person' in COCO dataset x1 = max(0, int(x1) - padding) y1 = max(0, int(y1) - padding) x2 = min(image_np.shape[1], int(x2) + padding) y2 = min(image_np.shape[0], int(y2) + padding) dets[-1].append({'frame': fidx, 'bbox': [x1, y1, x2, y2], 'conf': conf}) fidx += 1 yield return dets for _ in tqdm(generate_detections()): pass print("Successfully detected people in the video") savepath = os.path.join(opt.work_dir, 'faces.pckl') with open(savepath, 'wb') as fil: pickle.dump(dets, fil) return dets def scene_detect(opt): print("Detecting scenes in the video...") video_manager = VideoManager([os.path.join(opt.avi_dir, 'video.avi')]) stats_manager = StatsManager() scene_manager = SceneManager(stats_manager) scene_manager.add_detector(ContentDetector()) base_timecode = video_manager.get_base_timecode() video_manager.set_downscale_factor() video_manager.start() scene_manager.detect_scenes(frame_source=video_manager) scene_list = scene_manager.get_scene_list(base_timecode) savepath = os.path.join(opt.work_dir, 'scene.pckl') if scene_list == []: scene_list = [(video_manager.get_base_timecode(), video_manager.get_current_timecode())] with open(savepath, 'wb') as fil: pickle.dump(scene_list, fil) print('%s - scenes detected %d' % (os.path.join(opt.avi_dir, 'video.avi'), len(scene_list))) return scene_list def process_video(file): video_file_name = os.path.basename(file.strip()) sd_dest_folder = opt.sd_root work_dest_folder = opt.work_root del_folder(sd_dest_folder) del_folder(work_dest_folder) setattr(opt, 'videofile', file) if os.path.exists(opt.work_dir): rmtree(opt.work_dir) if os.path.exists(opt.crop_dir): rmtree(opt.crop_dir) if os.path.exists(opt.avi_dir): rmtree(opt.avi_dir) if os.path.exists(opt.frames_dir): rmtree(opt.frames_dir) if os.path.exists(opt.tmp_dir): rmtree(opt.tmp_dir) os.makedirs(opt.work_dir) os.makedirs(opt.crop_dir) os.makedirs(opt.avi_dir) os.makedirs(opt.frames_dir) os.makedirs(opt.tmp_dir) command = ("ffmpeg -hide_banner -loglevel panic -y -i %s -qscale:v 2 -async 1 -r 25 %s" % (opt.videofile, os.path.join(opt.avi_dir, 'video.avi'))) output = subprocess.call(command, shell=True, stdout=None) if output != 0: return command = ("ffmpeg -hide_banner -loglevel panic -y -i %s -ac 1 -vn -acodec pcm_s16le -ar 16000 %s" % (os.path.join(opt.avi_dir, 'video.avi'), os.path.join(opt.avi_dir, 'audio.wav'))) output = subprocess.call(command, shell=True, stdout=None) if output != 0: return faces = inference_video(opt) try: scene = scene_detect(opt) except scenedetect.video_stream.VideoOpenFailure: return allscenes = [] for shot in scene: if shot[1].frame_num - shot[0].frame_num >= opt.min_track: allscenes.append(track_shot(opt, faces[shot[0].frame_num:shot[1].frame_num])) alltracks = [] for sc_num in range(len(allscenes)): vidtracks = [] for ii, track in enumerate(allscenes[sc_num]): os.makedirs(os.path.join(opt.crop_dir, 'scene_'+str(sc_num)), exist_ok=True) vidtracks.append(crop_video(opt, track, os.path.join(opt.crop_dir, 'scene_'+str(sc_num), '%05d' % ii))) alltracks.append(vidtracks) savepath = os.path.join(opt.work_dir, 'tracks.pckl') with open(savepath, 'wb') as fil: pickle.dump(alltracks, fil) rmtree(opt.tmp_dir) rmtree(opt.avi_dir) rmtree(opt.frames_dir) copytree(opt.crop_dir, sd_dest_folder) copytree(opt.work_dir, work_dest_folder) if __name__ == "__main__": file = opt.data_root os.makedirs(opt.sd_root, exist_ok=True) os.makedirs(opt.work_root, exist_ok=True) setattr(opt, 'avi_dir', os.path.join(opt.data_dir, 'pyavi')) setattr(opt, 'tmp_dir', os.path.join(opt.data_dir, 'pytmp')) setattr(opt, 'work_dir', os.path.join(opt.data_dir, 'pywork')) setattr(opt, 'crop_dir', os.path.join(opt.data_dir, 'pycrop')) setattr(opt, 'frames_dir', os.path.join(opt.data_dir, 'pyframes')) process_video(file)