Spaces:
Running
on
Zero
Running
on
Zero
#!/usr/bin/python | |
import sys, os, argparse, pickle, subprocess, cv2, math | |
import numpy as np | |
from shutil import rmtree, copy, copytree | |
from tqdm import tqdm | |
import scenedetect | |
from scenedetect.video_manager import VideoManager | |
from scenedetect.scene_manager import SceneManager | |
from scenedetect.stats_manager import StatsManager | |
from scenedetect.detectors import ContentDetector | |
from scipy.interpolate import interp1d | |
from scipy import signal | |
from ultralytics import YOLO | |
from decord import VideoReader | |
parser = argparse.ArgumentParser(description="FaceTracker") | |
parser.add_argument('--data_dir', type=str, help='directory to save intermediate temp results') | |
parser.add_argument('--facedet_scale', type=float, default=0.25, help='Scale factor for face detection') | |
parser.add_argument('--crop_scale', type=float, default=0, help='Scale bounding box') | |
parser.add_argument('--min_track', type=int, default=50, help='Minimum facetrack duration') | |
parser.add_argument('--frame_rate', type=int, default=25, help='Frame rate') | |
parser.add_argument('--num_failed_det', type=int, default=25, help='Number of missed detections allowed before tracking is stopped') | |
parser.add_argument('--min_frame_size', type=int, default=64, help='Minimum frame size in pixels') | |
parser.add_argument('--sd_root', type=str, required=True, help='Path to save crops') | |
parser.add_argument('--work_root', type=str, required=True, help='Path to save metadata files') | |
parser.add_argument('--data_root', type=str, required=True, help='Directory containing ONLY full uncropped videos') | |
opt = parser.parse_args() | |
def bb_intersection_over_union(boxA, boxB): | |
xA = max(boxA[0], boxB[0]) | |
yA = max(boxA[1], boxB[1]) | |
xB = min(boxA[2], boxB[2]) | |
yB = min(boxB[3], boxB[3]) | |
interArea = max(0, xB - xA) * max(0, yB - yA) | |
boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1]) | |
boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1]) | |
iou = interArea / float(boxAArea + boxBArea - interArea) | |
return iou | |
def track_shot(opt, scenefaces): | |
print("Tracking video...") | |
iouThres = 0.5 # Minimum IOU between consecutive face detections | |
tracks = [] | |
while True: | |
track = [] | |
for framefaces in scenefaces: | |
for face in framefaces: | |
if track == []: | |
track.append(face) | |
framefaces.remove(face) | |
elif face['frame'] - track[-1]['frame'] <= opt.num_failed_det: | |
iou = bb_intersection_over_union(face['bbox'], track[-1]['bbox']) | |
if iou > iouThres: | |
track.append(face) | |
framefaces.remove(face) | |
continue | |
else: | |
break | |
if track == []: | |
break | |
elif len(track) > opt.min_track: | |
framenum = np.array([f['frame'] for f in track]) | |
bboxes = np.array([np.array(f['bbox']) for f in track]) | |
frame_i = np.arange(framenum[0], framenum[-1] + 1) | |
bboxes_i = [] | |
for ij in range(0, 4): | |
interpfn = interp1d(framenum, bboxes[:, ij]) | |
bboxes_i.append(interpfn(frame_i)) | |
bboxes_i = np.stack(bboxes_i, axis=1) | |
if max(np.mean(bboxes_i[:, 2] - bboxes_i[:, 0]), np.mean(bboxes_i[:, 3] - bboxes_i[:, 1])) > opt.min_frame_size: | |
tracks.append({'frame': frame_i, 'bbox': bboxes_i}) | |
return tracks | |
def check_folder(folder): | |
if os.path.exists(folder): | |
return True | |
return False | |
def del_folder(folder): | |
if os.path.exists(folder): | |
rmtree(folder) | |
def read_video(o, start_idx): | |
with open(o, 'rb') as o: | |
video_stream = VideoReader(o) | |
if start_idx > 0: | |
video_stream.skip_frames(start_idx) | |
return video_stream | |
def crop_video(opt, track, cropfile, tight_scale=1): | |
print("Cropping video...") | |
fourcc = cv2.VideoWriter_fourcc(*'XVID') | |
vOut = cv2.VideoWriter(cropfile + '.avi', fourcc, opt.frame_rate, (480, 270)) | |
dets = {'x': [], 'y': [], 's': [], 'bbox': track['bbox'], 'frame': track['frame']} | |
for det in track['bbox']: | |
# Reduce the size of the bounding box by a small factor if tighter crops are needed (default -> no reduction in size) | |
width = (det[2] - det[0]) * tight_scale | |
height = (det[3] - det[1]) * tight_scale | |
center_x = (det[0] + det[2]) / 2 | |
center_y = (det[1] + det[3]) / 2 | |
dets['s'].append(max(height, width) / 2) | |
dets['y'].append(center_y) # crop center y | |
dets['x'].append(center_x) # crop center x | |
# Smooth detections | |
dets['s'] = signal.medfilt(dets['s'], kernel_size=13) | |
dets['x'] = signal.medfilt(dets['x'], kernel_size=13) | |
dets['y'] = signal.medfilt(dets['y'], kernel_size=13) | |
videofile = os.path.join(opt.avi_dir, 'video.avi') | |
frame_no_to_start = track['frame'][0] | |
video_stream = cv2.VideoCapture(videofile) | |
video_stream.set(cv2.CAP_PROP_POS_FRAMES, frame_no_to_start) | |
for fidx, frame in enumerate(track['frame']): | |
cs = opt.crop_scale | |
bs = dets['s'][fidx] # Detection box size | |
bsi = int(bs * (1 + 2 * cs)) # Pad videos by this amount | |
image = video_stream.read()[1] | |
frame = np.pad(image, ((bsi, bsi), (bsi, bsi), (0, 0)), 'constant', constant_values=(110, 110)) | |
my = dets['y'][fidx] + bsi # BBox center Y | |
mx = dets['x'][fidx] + bsi # BBox center X | |
face = frame[int(my - bs):int(my + bs * (1 + 2 * cs)), int(mx - bs * (1 + cs)):int(mx + bs * (1 + cs))] | |
vOut.write(cv2.resize(face, (480, 270))) | |
video_stream.release() | |
audiotmp = os.path.join(opt.tmp_dir, 'audio.wav') | |
audiostart = (track['frame'][0]) / opt.frame_rate | |
audioend = (track['frame'][-1] + 1) / opt.frame_rate | |
vOut.release() | |
# ========== CROP AUDIO FILE ========== | |
command = ("ffmpeg -hide_banner -loglevel panic -y -i %s -ss %.3f -to %.3f %s" % (os.path.join(opt.avi_dir, 'audio.wav'), audiostart, audioend, audiotmp)) | |
output = subprocess.call(command, shell=True, stdout=None) | |
copy(audiotmp, cropfile + '.wav') | |
# print('Written %s' % cropfile) | |
# print('Mean pos: x %.2f y %.2f s %.2f' % (np.mean(dets['x']), np.mean(dets['y']), np.mean(dets['s']))) | |
return {'track': track, 'proc_track': dets} | |
def inference_video(opt, padding=0): | |
videofile = os.path.join(opt.avi_dir, 'video.avi') | |
vidObj = cv2.VideoCapture(videofile) | |
yolo_model = YOLO("yolov9m.pt") | |
global dets, fidx | |
dets = [] | |
fidx = 0 | |
print("Detecting people in the video using YOLO (slowest step in the pipeline)...") | |
def generate_detections(): | |
global dets, fidx | |
while True: | |
success, image = vidObj.read() | |
if not success: | |
break | |
image_np = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
# Perform person detection | |
results = yolo_model(image_np, verbose=False) | |
detections = results[0].boxes | |
dets.append([]) | |
for i, det in enumerate(detections): | |
x1, y1, x2, y2 = det.xyxy[0].detach().cpu().numpy() | |
cls = det.cls[0].detach().cpu().numpy() | |
conf = det.conf[0].detach().cpu().numpy() | |
if int(cls) == 0 and conf>0.7: # Class 0 is 'person' in COCO dataset | |
x1 = max(0, int(x1) - padding) | |
y1 = max(0, int(y1) - padding) | |
x2 = min(image_np.shape[1], int(x2) + padding) | |
y2 = min(image_np.shape[0], int(y2) + padding) | |
dets[-1].append({'frame': fidx, 'bbox': [x1, y1, x2, y2], 'conf': conf}) | |
fidx += 1 | |
yield | |
return dets | |
for _ in tqdm(generate_detections()): | |
pass | |
print("Successfully detected people in the video") | |
savepath = os.path.join(opt.work_dir, 'faces.pckl') | |
with open(savepath, 'wb') as fil: | |
pickle.dump(dets, fil) | |
return dets | |
def scene_detect(opt): | |
print("Detecting scenes in the video...") | |
video_manager = VideoManager([os.path.join(opt.avi_dir, 'video.avi')]) | |
stats_manager = StatsManager() | |
scene_manager = SceneManager(stats_manager) | |
scene_manager.add_detector(ContentDetector()) | |
base_timecode = video_manager.get_base_timecode() | |
video_manager.set_downscale_factor() | |
video_manager.start() | |
scene_manager.detect_scenes(frame_source=video_manager) | |
scene_list = scene_manager.get_scene_list(base_timecode) | |
savepath = os.path.join(opt.work_dir, 'scene.pckl') | |
if scene_list == []: | |
scene_list = [(video_manager.get_base_timecode(), video_manager.get_current_timecode())] | |
with open(savepath, 'wb') as fil: | |
pickle.dump(scene_list, fil) | |
print('%s - scenes detected %d' % (os.path.join(opt.avi_dir, 'video.avi'), len(scene_list))) | |
return scene_list | |
def process_video(file): | |
video_file_name = os.path.basename(file.strip()) | |
sd_dest_folder = opt.sd_root | |
work_dest_folder = opt.work_root | |
del_folder(sd_dest_folder) | |
del_folder(work_dest_folder) | |
setattr(opt, 'videofile', file) | |
if os.path.exists(opt.work_dir): | |
rmtree(opt.work_dir) | |
if os.path.exists(opt.crop_dir): | |
rmtree(opt.crop_dir) | |
if os.path.exists(opt.avi_dir): | |
rmtree(opt.avi_dir) | |
if os.path.exists(opt.frames_dir): | |
rmtree(opt.frames_dir) | |
if os.path.exists(opt.tmp_dir): | |
rmtree(opt.tmp_dir) | |
os.makedirs(opt.work_dir) | |
os.makedirs(opt.crop_dir) | |
os.makedirs(opt.avi_dir) | |
os.makedirs(opt.frames_dir) | |
os.makedirs(opt.tmp_dir) | |
command = ("ffmpeg -hide_banner -loglevel panic -y -i %s -qscale:v 2 -async 1 -r 25 %s" % (opt.videofile, | |
os.path.join(opt.avi_dir, | |
'video.avi'))) | |
output = subprocess.call(command, shell=True, stdout=None) | |
if output != 0: | |
return | |
command = ("ffmpeg -hide_banner -loglevel panic -y -i %s -ac 1 -vn -acodec pcm_s16le -ar 16000 %s" % (os.path.join(opt.avi_dir, | |
'video.avi'), | |
os.path.join(opt.avi_dir, | |
'audio.wav'))) | |
output = subprocess.call(command, shell=True, stdout=None) | |
if output != 0: | |
return | |
faces = inference_video(opt) | |
try: | |
scene = scene_detect(opt) | |
except scenedetect.video_stream.VideoOpenFailure: | |
return | |
allscenes = [] | |
for shot in scene: | |
if shot[1].frame_num - shot[0].frame_num >= opt.min_track: | |
allscenes.append(track_shot(opt, faces[shot[0].frame_num:shot[1].frame_num])) | |
alltracks = [] | |
for sc_num in range(len(allscenes)): | |
vidtracks = [] | |
for ii, track in enumerate(allscenes[sc_num]): | |
os.makedirs(os.path.join(opt.crop_dir, 'scene_'+str(sc_num)), exist_ok=True) | |
vidtracks.append(crop_video(opt, track, os.path.join(opt.crop_dir, 'scene_'+str(sc_num), '%05d' % ii))) | |
alltracks.append(vidtracks) | |
savepath = os.path.join(opt.work_dir, 'tracks.pckl') | |
with open(savepath, 'wb') as fil: | |
pickle.dump(alltracks, fil) | |
rmtree(opt.tmp_dir) | |
rmtree(opt.avi_dir) | |
rmtree(opt.frames_dir) | |
copytree(opt.crop_dir, sd_dest_folder) | |
copytree(opt.work_dir, work_dest_folder) | |
if __name__ == "__main__": | |
file = opt.data_root | |
os.makedirs(opt.sd_root, exist_ok=True) | |
os.makedirs(opt.work_root, exist_ok=True) | |
setattr(opt, 'avi_dir', os.path.join(opt.data_dir, 'pyavi')) | |
setattr(opt, 'tmp_dir', os.path.join(opt.data_dir, 'pytmp')) | |
setattr(opt, 'work_dir', os.path.join(opt.data_dir, 'pywork')) | |
setattr(opt, 'crop_dir', os.path.join(opt.data_dir, 'pycrop')) | |
setattr(opt, 'frames_dir', os.path.join(opt.data_dir, 'pyframes')) | |
process_video(file) | |