File size: 8,471 Bytes
026cee8 10e6ed7 026cee8 10e6ed7 026cee8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
import json
import cv2
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
import logging
import tempfile
from pathlib import Path
import firebase_admin
from firebase_admin import credentials, firestore, storage
from pydantic import BaseModel
from deepface import DeepFace
from tqdm import tqdm
# Set up logging
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Initialize Firebase
try:
cred = credentials.Certificate("serviceAccountKey.json")
firebase_app = firebase_admin.initialize_app(cred, {
'storageBucket': 'future-forge-60d3f.appspot.com'
})
db = firestore.client()
bucket = storage.bucket(app=firebase_app)
logger.info("Firebase initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Firebase: {str(e)}")
app = FastAPI()
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Define the input model
class FileProcess(BaseModel):
file_path: str
@app.post("/facial-emotion")
async def process_file(file_data: FileProcess):
logger.info(f"Processing file from Firebase Storage: {file_data.file_path}")
try:
# Get the file from Firebase Storage
blob = bucket.blob(file_data.file_path)
# Create a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_data.file_path.split('.')[-1]}") as tmp_file:
blob.download_to_filename(tmp_file.name)
tmp_file_path = Path(tmp_file.name)
logger.info(f"File downloaded temporarily at: {tmp_file_path}")
file_type = file_data.file_path.split('.')[-1].lower()
result = None
try:
if file_type in ['jpg', 'jpeg', 'png', 'bmp']:
output_image = process_image(tmp_file_path)
result = {"type": "image", "data": output_image}
elif file_type in ['mp4', 'avi', 'mov', 'wmv']:
video_output = process_video(str(tmp_file_path))
result = {"type": "video", "data": video_output}
else:
raise HTTPException(status_code=400, detail="Unsupported file type")
logger.info(f"Processing complete. Result: {result}")
# Store result in Firebase
try:
doc_ref = db.collection('results').add(result)
return {"message": "File processed successfully", "result": result}
except Exception as e:
logger.error(f"Failed to store result in Firebase: {str(e)}")
return {"message": "File processed successfully, but failed to store in Firebase", "result": result,
"error": str(e)}
finally:
# Clean up the temporary file after processing
if tmp_file_path.exists():
tmp_file_path.unlink()
except Exception as e:
logger.error(f"Error processing file: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error processing file: {str(e)}")
def process_video(video_path, output_video_path='output_video.mp4', frame_sample_rate=5):
cap = cv2.VideoCapture(video_path)
# Check if video opened successfully
if not cap.isOpened():
logger.error("Error: Could not open video.")
return None
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Define the codec and create VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
output = {}
frame_index = 0
# Create a progress bar
with tqdm(total=total_frames, desc="Processing video") as pbar:
while True:
ret, frame = cap.read()
if not ret:
logger.info("End of video or cannot capture the frame.")
break
if frame_index % frame_sample_rate == 0: # Only analyze every nth frame
try:
result = DeepFace.analyze(frame, actions=['emotion'], detector_backend='retinaface',enforce_detection=False)
except Exception as e:
logger.error(f"Error analyzing frame {frame_index}: {e}")
output[frame_index] = {}
out.write(frame) # Write the original frame
frame_index += 1
pbar.update(1)
continue # Skip to the next frame
tmp = {}
for face in result:
x, y, w, h = face['region']['x'], face['region']['y'], face['region']['w'], face['region']['h']
emotion = face['dominant_emotion']
emotion_scores = face['emotion']
tmp[(x, y, w, h)] = {'emotion': emotion, 'score': emotion_scores[emotion]}
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 2)
cv2.putText(frame, f"{emotion} ({emotion_scores[emotion]:.2f})", (x, y - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2)
output[frame_index] = tmp
out.write(frame) # Write the processed frame
frame_index += 1
pbar.update(1) # Update progress bar
# Release resources
cap.release()
out.release()
# Save the results to a file
with open('results_video.txt', 'w') as file:
for frame_num, faces_info in output.items():
file.write(f"Frame {frame_num} ")
for face_key, info in faces_info.items():
file.write(f"{face_key}: {info}\n")
logger.info(f"Processed {frame_index} frames.")
video_json_output = calculate_emotion_percentages('results_video.txt')
print(video_json_output)
return video_json_output
def process_image(image_path):
image = cv2.imread(image_path)
if image is None:
print(f"Error: Unable to load image from path {image_path}")
return
try:
# Analyze the image for face detection and emotion analysis
result = DeepFace.analyze(image_path, actions=['emotion'], detector_backend='retinaface',enforce_detection=False)
except Exception as e:
print(f"Error analyzing image: {e}")
return image
if len(result) == 0:
print("No faces detected.")
return image # Return the original image if no faces are detected
output = {}
tmp = {}
for i, face in enumerate(result):
# Get bounding box coordinates for each detected face
x, y, w, h = face['region']['x'], face['region']['y'], face['region']['w'], face['region']['h']
# Extract emotion data
emotion = face['dominant_emotion']
emotion_scores = face['emotion']
tmp[i] = {'person':i+1,'emotion': emotion, 'score': f"{emotion_scores[emotion]:.3f}"}
# Draw rectangle around face and label with predicted emotion
cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2)
cv2.putText(image, f"{emotion} ({emotion_scores[emotion]:.3f})", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8,(0, 255, 0), 2)
output['output'] = tmp
# Save the processed image with bounding boxes and labels
output_image_path = 'output_image_with_emotions.jpg'
cv2.imwrite(output_image_path, image)
print(f"Processed image saved as {output_image_path}")
string_image_output = json.dumps(output['output'])
return string_image_output
def calculate_emotion_percentages(file_path):
emotions = {}
total_frames = 0
with open(file_path, 'r') as file:
for line in file:
if "{'emotion':" in line:
total_frames += 1
emotion = line.split("'emotion': ")[1].split("'")[1]
emotions[emotion] = emotions.get(emotion, 0) + 1
emotion_percentages = [
{"emotion": emotion, "percentage": (count / total_frames) * 100}
for emotion, count in emotions.items()
]
return emotion_percentages
if __name__ == "__main__":
logger.info("Starting the Face Emotion Recognition API")
uvicorn.run(app, host="0.0.0.0", port=7860) |