gestsync / preprocess /inference_preprocess.py
sindhuhegde's picture
Update app
4ad47a9
raw
history blame
10.7 kB
#!/usr/bin/python
import sys, os, argparse, pickle, subprocess, cv2, math
import numpy as np
from shutil import rmtree, copy, copytree
from tqdm import tqdm
import scenedetect
from scenedetect.video_manager import VideoManager
from scenedetect.scene_manager import SceneManager
from scenedetect.stats_manager import StatsManager
from scenedetect.detectors import ContentDetector
from scipy.interpolate import interp1d
from scipy import signal
from ultralytics import YOLO
from decord import VideoReader
parser = argparse.ArgumentParser(description="FaceTracker")
parser.add_argument('--data_dir', type=str, help='directory to save intermediate temp results')
parser.add_argument('--facedet_scale', type=float, default=0.25, help='Scale factor for face detection')
parser.add_argument('--crop_scale', type=float, default=0, help='Scale bounding box')
parser.add_argument('--min_track', type=int, default=50, help='Minimum facetrack duration')
parser.add_argument('--frame_rate', type=int, default=25, help='Frame rate')
parser.add_argument('--num_failed_det', type=int, default=25, help='Number of missed detections allowed before tracking is stopped')
parser.add_argument('--min_frame_size', type=int, default=64, help='Minimum frame size in pixels')
parser.add_argument('--sd_root', type=str, required=True, help='Path to save crops')
parser.add_argument('--work_root', type=str, required=True, help='Path to save metadata files')
parser.add_argument('--data_root', type=str, required=True, help='Directory containing ONLY full uncropped videos')
opt = parser.parse_args()
def bb_intersection_over_union(boxA, boxB):
xA = max(boxA[0], boxB[0])
yA = max(boxA[1], boxB[1])
xB = min(boxA[2], boxB[2])
yB = min(boxB[3], boxB[3])
interArea = max(0, xB - xA) * max(0, yB - yA)
boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])
boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])
iou = interArea / float(boxAArea + boxBArea - interArea)
return iou
def track_shot(opt, scenefaces):
print("Tracking video...")
iouThres = 0.5 # Minimum IOU between consecutive face detections
tracks = []
while True:
track = []
for framefaces in scenefaces:
for face in framefaces:
if track == []:
track.append(face)
framefaces.remove(face)
elif face['frame'] - track[-1]['frame'] <= opt.num_failed_det:
iou = bb_intersection_over_union(face['bbox'], track[-1]['bbox'])
if iou > iouThres:
track.append(face)
framefaces.remove(face)
continue
else:
break
if track == []:
break
elif len(track) > opt.min_track:
framenum = np.array([f['frame'] for f in track])
bboxes = np.array([np.array(f['bbox']) for f in track])
frame_i = np.arange(framenum[0], framenum[-1] + 1)
bboxes_i = []
for ij in range(0, 4):
interpfn = interp1d(framenum, bboxes[:, ij])
bboxes_i.append(interpfn(frame_i))
bboxes_i = np.stack(bboxes_i, axis=1)
if max(np.mean(bboxes_i[:, 2] - bboxes_i[:, 0]), np.mean(bboxes_i[:, 3] - bboxes_i[:, 1])) > opt.min_frame_size:
tracks.append({'frame': frame_i, 'bbox': bboxes_i})
return tracks
def check_folder(folder):
if os.path.exists(folder):
return True
return False
def del_folder(folder):
if os.path.exists(folder):
rmtree(folder)
def read_video(o, start_idx):
with open(o, 'rb') as o:
video_stream = VideoReader(o)
if start_idx > 0:
video_stream.skip_frames(start_idx)
return video_stream
def crop_video(opt, track, cropfile, tight_scale=1):
print("Cropping video...")
fourcc = cv2.VideoWriter_fourcc(*'XVID')
vOut = cv2.VideoWriter(cropfile + '.avi', fourcc, opt.frame_rate, (480, 270))
dets = {'x': [], 'y': [], 's': [], 'bbox': track['bbox'], 'frame': track['frame']}
for det in track['bbox']:
# Reduce the size of the bounding box by a small factor if tighter crops are needed (default -> no reduction in size)
width = (det[2] - det[0]) * tight_scale
height = (det[3] - det[1]) * tight_scale
center_x = (det[0] + det[2]) / 2
center_y = (det[1] + det[3]) / 2
dets['s'].append(max(height, width) / 2)
dets['y'].append(center_y) # crop center y
dets['x'].append(center_x) # crop center x
# Smooth detections
dets['s'] = signal.medfilt(dets['s'], kernel_size=13)
dets['x'] = signal.medfilt(dets['x'], kernel_size=13)
dets['y'] = signal.medfilt(dets['y'], kernel_size=13)
videofile = os.path.join(opt.avi_dir, 'video.avi')
frame_no_to_start = track['frame'][0]
video_stream = cv2.VideoCapture(videofile)
video_stream.set(cv2.CAP_PROP_POS_FRAMES, frame_no_to_start)
for fidx, frame in enumerate(track['frame']):
cs = opt.crop_scale
bs = dets['s'][fidx] # Detection box size
bsi = int(bs * (1 + 2 * cs)) # Pad videos by this amount
image = video_stream.read()[1]
frame = np.pad(image, ((bsi, bsi), (bsi, bsi), (0, 0)), 'constant', constant_values=(110, 110))
my = dets['y'][fidx] + bsi # BBox center Y
mx = dets['x'][fidx] + bsi # BBox center X
face = frame[int(my - bs):int(my + bs * (1 + 2 * cs)), int(mx - bs * (1 + cs)):int(mx + bs * (1 + cs))]
vOut.write(cv2.resize(face, (480, 270)))
video_stream.release()
audiotmp = os.path.join(opt.tmp_dir, 'audio.wav')
audiostart = (track['frame'][0]) / opt.frame_rate
audioend = (track['frame'][-1] + 1) / opt.frame_rate
vOut.release()
# ========== CROP AUDIO FILE ==========
command = ("ffmpeg -hide_banner -loglevel panic -y -i %s -ss %.3f -to %.3f %s" % (os.path.join(opt.avi_dir, 'audio.wav'), audiostart, audioend, audiotmp))
output = subprocess.call(command, shell=True, stdout=None)
copy(audiotmp, cropfile + '.wav')
# print('Written %s' % cropfile)
# print('Mean pos: x %.2f y %.2f s %.2f' % (np.mean(dets['x']), np.mean(dets['y']), np.mean(dets['s'])))
return {'track': track, 'proc_track': dets}
def inference_video(opt, padding=0):
videofile = os.path.join(opt.avi_dir, 'video.avi')
vidObj = cv2.VideoCapture(videofile)
yolo_model = YOLO("yolov9m.pt")
global dets, fidx
dets = []
fidx = 0
print("Detecting people in the video using YOLO (slowest step in the pipeline)...")
def generate_detections():
global dets, fidx
while True:
success, image = vidObj.read()
if not success:
break
image_np = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Perform person detection
results = yolo_model(image_np, verbose=False)
detections = results[0].boxes
dets.append([])
for i, det in enumerate(detections):
x1, y1, x2, y2 = det.xyxy[0].detach().cpu().numpy()
cls = det.cls[0].detach().cpu().numpy()
conf = det.conf[0].detach().cpu().numpy()
if int(cls) == 0 and conf>0.7: # Class 0 is 'person' in COCO dataset
x1 = max(0, int(x1) - padding)
y1 = max(0, int(y1) - padding)
x2 = min(image_np.shape[1], int(x2) + padding)
y2 = min(image_np.shape[0], int(y2) + padding)
dets[-1].append({'frame': fidx, 'bbox': [x1, y1, x2, y2], 'conf': conf})
fidx += 1
yield
return dets
for _ in tqdm(generate_detections()):
pass
print("Successfully detected people in the video")
savepath = os.path.join(opt.work_dir, 'faces.pckl')
with open(savepath, 'wb') as fil:
pickle.dump(dets, fil)
return dets
def scene_detect(opt):
print("Detecting scenes in the video...")
video_manager = VideoManager([os.path.join(opt.avi_dir, 'video.avi')])
stats_manager = StatsManager()
scene_manager = SceneManager(stats_manager)
scene_manager.add_detector(ContentDetector())
base_timecode = video_manager.get_base_timecode()
video_manager.set_downscale_factor()
video_manager.start()
scene_manager.detect_scenes(frame_source=video_manager)
scene_list = scene_manager.get_scene_list(base_timecode)
savepath = os.path.join(opt.work_dir, 'scene.pckl')
if scene_list == []:
scene_list = [(video_manager.get_base_timecode(), video_manager.get_current_timecode())]
with open(savepath, 'wb') as fil:
pickle.dump(scene_list, fil)
print('%s - scenes detected %d' % (os.path.join(opt.avi_dir, 'video.avi'), len(scene_list)))
return scene_list
def process_video(file):
video_file_name = os.path.basename(file.strip())
sd_dest_folder = opt.sd_root
work_dest_folder = opt.work_root
del_folder(sd_dest_folder)
del_folder(work_dest_folder)
setattr(opt, 'videofile', file)
if os.path.exists(opt.work_dir):
rmtree(opt.work_dir)
if os.path.exists(opt.crop_dir):
rmtree(opt.crop_dir)
if os.path.exists(opt.avi_dir):
rmtree(opt.avi_dir)
if os.path.exists(opt.frames_dir):
rmtree(opt.frames_dir)
if os.path.exists(opt.tmp_dir):
rmtree(opt.tmp_dir)
os.makedirs(opt.work_dir)
os.makedirs(opt.crop_dir)
os.makedirs(opt.avi_dir)
os.makedirs(opt.frames_dir)
os.makedirs(opt.tmp_dir)
command = ("ffmpeg -hide_banner -loglevel panic -y -i %s -qscale:v 2 -async 1 -r 25 %s" % (opt.videofile,
os.path.join(opt.avi_dir,
'video.avi')))
output = subprocess.call(command, shell=True, stdout=None)
if output != 0:
return
command = ("ffmpeg -hide_banner -loglevel panic -y -i %s -ac 1 -vn -acodec pcm_s16le -ar 16000 %s" % (os.path.join(opt.avi_dir,
'video.avi'),
os.path.join(opt.avi_dir,
'audio.wav')))
output = subprocess.call(command, shell=True, stdout=None)
if output != 0:
return
faces = inference_video(opt)
try:
scene = scene_detect(opt)
except scenedetect.video_stream.VideoOpenFailure:
return
allscenes = []
for shot in scene:
if shot[1].frame_num - shot[0].frame_num >= opt.min_track:
allscenes.append(track_shot(opt, faces[shot[0].frame_num:shot[1].frame_num]))
alltracks = []
for sc_num in range(len(allscenes)):
vidtracks = []
for ii, track in enumerate(allscenes[sc_num]):
os.makedirs(os.path.join(opt.crop_dir, 'scene_'+str(sc_num)), exist_ok=True)
vidtracks.append(crop_video(opt, track, os.path.join(opt.crop_dir, 'scene_'+str(sc_num), '%05d' % ii)))
alltracks.append(vidtracks)
savepath = os.path.join(opt.work_dir, 'tracks.pckl')
with open(savepath, 'wb') as fil:
pickle.dump(alltracks, fil)
rmtree(opt.tmp_dir)
rmtree(opt.avi_dir)
rmtree(opt.frames_dir)
copytree(opt.crop_dir, sd_dest_folder)
copytree(opt.work_dir, work_dest_folder)
if __name__ == "__main__":
file = opt.data_root
os.makedirs(opt.sd_root, exist_ok=True)
os.makedirs(opt.work_root, exist_ok=True)
setattr(opt, 'avi_dir', os.path.join(opt.data_dir, 'pyavi'))
setattr(opt, 'tmp_dir', os.path.join(opt.data_dir, 'pytmp'))
setattr(opt, 'work_dir', os.path.join(opt.data_dir, 'pywork'))
setattr(opt, 'crop_dir', os.path.join(opt.data_dir, 'pycrop'))
setattr(opt, 'frames_dir', os.path.join(opt.data_dir, 'pyframes'))
process_video(file)