Spaces:
Paused
Paused
""" | |
Emotion Detection: | |
Model from: https://github.com/onnx/models/blob/main/vision/body_analysis/emotion_ferplus/model/emotion-ferplus-8.onnx | |
Model name: emotion-ferplus-8.onnx | |
""" | |
import cv2 | |
import numpy as np | |
import time | |
import os | |
from cv2 import dnn | |
from math import ceil | |
import logging | |
import queue | |
from pathlib import Path | |
from typing import List, NamedTuple | |
import av | |
import streamlit as st | |
from streamlit_webrtc import WebRtcMode, webrtc_streamer | |
from sample_utils.download import download_file | |
from sample_utils.turn import get_ice_servers | |
HERE = Path(__file__).parent | |
ROOT = HERE.parent | |
logger = logging.getLogger(__name__) | |
ONNX_MODEL_URL = "https://github.com/spmallick/learnopencv/raw/master/Facial-Emotion-Recognition/emotion-ferplus-8.onnx" # noqa: E501 | |
ONNX_MODEL_LOCAL_PATH = ROOT / "./emotion-ferplus-8.onnx" | |
CAFFE_MODEL_URL = "https://github.com/spmallick/learnopencv/raw/master/Facial-Emotion-Recognition/RFB-320/RFB-320.caffemodel" # noqa: E501 | |
CAFFE_MODEL_LOCAL_PATH = ROOT / "./RFB-320/RFB-320.caffemodel" | |
PROTOTXT_URL = "https://github.com/spmallick/learnopencv/raw/master/Facial-Emotion-Recognition/RFB-320/RFB-320.prototxt" # noqa: E501 | |
PROTOTXT_LOCAL_PATH = ROOT / "./RFB-320/RFB-320.prototxt.txt" | |
download_file(ONNX_MODEL_URL, ONNX_MODEL_LOCAL_PATH, expected_size=None) | |
download_file(CAFFE_MODEL_URL, CAFFE_MODEL_LOCAL_PATH, expected_size=None) | |
download_file(PROTOTXT_URL, PROTOTXT_LOCAL_PATH, expected_size=None) | |
# Session-specific caching | |
onnx_cache_key = "onnx_model" | |
caffe_cache_key = "caffe_model" | |
if onnx_cache_key in st.session_state and caffe_cache_key in st.session_state: | |
model = st.session_state[onnx_cache_key] | |
net = st.session_state[caffe_cache_key] | |
else: | |
# emotion detection model | |
model = cv2.dnn.readNetFromONNX(str(ONNX_MODEL_LOCAL_PATH)) | |
# face detection model | |
net = cv2.dnn.readNetFromCaffe(str(PROTOTXT_LOCAL_PATH), str(CAFFE_MODEL_LOCAL_PATH)) | |
st.session_state[onnx_cache_key] = model | |
st.session_state[caffe_cache_key] = net | |
image_mean = np.array([127, 127, 127]) | |
image_std = 128.0 | |
iou_threshold = 0.3 | |
center_variance = 0.1 | |
size_variance = 0.2 | |
min_boxes = [ | |
[10.0, 16.0, 24.0], | |
[32.0, 48.0], | |
[64.0, 96.0], | |
[128.0, 192.0, 256.0] | |
] | |
strides = [8.0, 16.0, 32.0, 64.0] | |
threshold = 0.5 | |
emotion_dict = { | |
0: 'neutral', | |
1: 'happiness', | |
2: 'surprise', | |
3: 'sadness', | |
4: 'anger', | |
5: 'disgust', | |
6: 'fear' | |
} | |
def define_img_size(image_size): | |
shrinkage_list = [] | |
feature_map_w_h_list = [] | |
for size in image_size: | |
feature_map = [int(ceil(size / stride)) for stride in strides] | |
feature_map_w_h_list.append(feature_map) | |
for i in range(0, len(image_size)): | |
shrinkage_list.append(strides) | |
priors = generate_priors( | |
feature_map_w_h_list, shrinkage_list, image_size, min_boxes | |
) | |
return priors | |
def generate_priors( | |
feature_map_list, shrinkage_list, image_size, min_boxes | |
): | |
priors = [] | |
for index in range(0, len(feature_map_list[0])): | |
scale_w = image_size[0] / shrinkage_list[0][index] | |
scale_h = image_size[1] / shrinkage_list[1][index] | |
for j in range(0, feature_map_list[1][index]): | |
for i in range(0, feature_map_list[0][index]): | |
x_center = (i + 0.5) / scale_w | |
y_center = (j + 0.5) / scale_h | |
for min_box in min_boxes[index]: | |
w = min_box / image_size[0] | |
h = min_box / image_size[1] | |
priors.append([ | |
x_center, | |
y_center, | |
w, | |
h | |
]) | |
print("priors nums:{}".format(len(priors))) | |
return np.clip(priors, 0.0, 1.0) | |
def hard_nms(box_scores, iou_threshold, top_k=-1, candidate_size=200): | |
scores = box_scores[:, -1] | |
boxes = box_scores[:, :-1] | |
picked = [] | |
indexes = np.argsort(scores) | |
indexes = indexes[-candidate_size:] | |
while len(indexes) > 0: | |
current = indexes[-1] | |
picked.append(current) | |
if 0 < top_k == len(picked) or len(indexes) == 1: | |
break | |
current_box = boxes[current, :] | |
indexes = indexes[:-1] | |
rest_boxes = boxes[indexes, :] | |
iou = iou_of( | |
rest_boxes, | |
np.expand_dims(current_box, axis=0), | |
) | |
indexes = indexes[iou <= iou_threshold] | |
return box_scores[picked, :] | |
def area_of(left_top, right_bottom): | |
hw = np.clip(right_bottom - left_top, 0.0, None) | |
return hw[..., 0] * hw[..., 1] | |
def iou_of(boxes0, boxes1, eps=1e-5): | |
overlap_left_top = np.maximum(boxes0[..., :2], boxes1[..., :2]) | |
overlap_right_bottom = np.minimum(boxes0[..., 2:], boxes1[..., 2:]) | |
overlap_area = area_of(overlap_left_top, overlap_right_bottom) | |
area0 = area_of(boxes0[..., :2], boxes0[..., 2:]) | |
area1 = area_of(boxes1[..., :2], boxes1[..., 2:]) | |
return overlap_area / (area0 + area1 - overlap_area + eps) | |
def predict( | |
width, | |
height, | |
confidences, | |
boxes, | |
prob_threshold, | |
iou_threshold=0.3, | |
top_k=-1 | |
): | |
boxes = boxes[0] | |
confidences = confidences[0] | |
picked_box_probs = [] | |
picked_labels = [] | |
for class_index in range(1, confidences.shape[1]): | |
probs = confidences[:, class_index] | |
mask = probs > prob_threshold | |
probs = probs[mask] | |
if probs.shape[0] == 0: | |
continue | |
subset_boxes = boxes[mask, :] | |
box_probs = np.concatenate( | |
[subset_boxes, probs.reshape(-1, 1)], axis=1 | |
) | |
box_probs = hard_nms(box_probs, | |
iou_threshold=iou_threshold, | |
top_k=top_k, | |
) | |
picked_box_probs.append(box_probs) | |
picked_labels.extend([class_index] * box_probs.shape[0]) | |
if not picked_box_probs: | |
return np.array([]), np.array([]), np.array([]) | |
picked_box_probs = np.concatenate(picked_box_probs) | |
picked_box_probs[:, 0] *= width | |
picked_box_probs[:, 1] *= height | |
picked_box_probs[:, 2] *= width | |
picked_box_probs[:, 3] *= height | |
return ( | |
picked_box_probs[:, :4].astype(np.int32), | |
np.array(picked_labels), | |
picked_box_probs[:, 4] | |
) | |
def convert_locations_to_boxes(locations, priors, center_variance, | |
size_variance): | |
if len(priors.shape) + 1 == len(locations.shape): | |
priors = np.expand_dims(priors, 0) | |
return np.concatenate([ | |
locations[..., :2] * center_variance * priors[..., 2:] + priors[..., :2], | |
np.exp(locations[..., 2:] * size_variance) * priors[..., 2:] | |
], axis=len(locations.shape) - 1) | |
def center_form_to_corner_form(locations): | |
return np.concatenate( | |
[locations[..., :2] - locations[..., 2:] / 2, | |
locations[..., :2] + locations[..., 2:] / 2], | |
len(locations.shape) - 1 | |
) | |
def video_frame_callback(frame: av.VideoFrame) -> av.VideoFrame: | |
frame = frame.to_ndarray(format="bgr24") | |
input_size = [320, 240] | |
width = input_size[0] | |
height = input_size[1] | |
priors = define_img_size(input_size) | |
img_ori = frame | |
#print("frame size: ", frame.shape) | |
rect = cv2.resize(img_ori, (width, height)) | |
rect = cv2.cvtColor(rect, cv2.COLOR_BGR2RGB) | |
net.setInput(dnn.blobFromImage( | |
rect, 1 / image_std, (width, height), 127) | |
) | |
start_time = time.time() | |
boxes, scores = net.forward(["boxes", "scores"]) | |
boxes = np.expand_dims(np.reshape(boxes, (-1, 4)), axis=0) | |
scores = np.expand_dims(np.reshape(scores, (-1, 2)), axis=0) | |
boxes = convert_locations_to_boxes( | |
boxes, priors, center_variance, size_variance | |
) | |
boxes = center_form_to_corner_form(boxes) | |
boxes, labels, probs = predict( | |
img_ori.shape[1], | |
img_ori.shape[0], | |
scores, | |
boxes, | |
threshold | |
) | |
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
for (x1, y1, x2, y2) in boxes: | |
w = x2 - x1 | |
h = y2 - y1 | |
cv2.rectangle(frame, (x1,y1), (x2, y2), (255,0,0), 2) | |
resize_frame = cv2.resize( | |
gray[y1:y1 + h, x1:x1 + w], (64, 64) | |
) | |
resize_frame = resize_frame.reshape(1, 1, 64, 64) | |
model.setInput(resize_frame) | |
output = model.forward() | |
end_time = time.time() | |
fps = 1 / (end_time - start_time) | |
print(f"FPS: {fps:.1f}") | |
pred = emotion_dict[list(output[0]).index(max(output[0]))] | |
cv2.rectangle( | |
img_ori, | |
(x1, y1), | |
(x2, y2), | |
(215, 5, 247), | |
2, | |
lineType=cv2.LINE_AA | |
) | |
cv2.putText( | |
frame, | |
pred, | |
(x1, y1-10), | |
cv2.FONT_HERSHEY_SIMPLEX, | |
0.8, | |
(215, 5, 247), | |
2, | |
lineType=cv2.LINE_AA | |
) | |
return av.VideoFrame.from_ndarray(frame, format="bgr24") | |
if __name__ == "__main__": | |
webrtc_ctx = webrtc_streamer( | |
key="face-emotion-recognition", | |
mode=WebRtcMode.SENDRECV, | |
rtc_configuration={ | |
"iceServers": get_ice_servers(), | |
"iceTransportPolicy": "relay", | |
}, | |
video_frame_callback=video_frame_callback, | |
media_stream_constraints={"video": True, "audio": False}, | |
async_processing=True, | |
) | |
st.markdown( | |
"This demo uses a model and code from " | |
"https://github.com/spmallick/learnopencv. " | |
"Many thanks to the project." | |
) |