import json import cv2 from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware import uvicorn import logging import tempfile from pathlib import Path import firebase_admin from firebase_admin import credentials, firestore, storage from pydantic import BaseModel from deepface import DeepFace from tqdm import tqdm # Set up logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Initialize Firebase try: cred = credentials.Certificate("serviceAccountKey.json") firebase_app = firebase_admin.initialize_app(cred, { 'storageBucket': 'future-forge-60d3f.appspot.com' }) db = firestore.client() bucket = storage.bucket(app=firebase_app) logger.info("Firebase initialized successfully") except Exception as e: logger.error(f"Failed to initialize Firebase: {str(e)}") app = FastAPI() # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Define the input model class FileProcess(BaseModel): file_path: str @app.post("/facial-emotion") async def process_file(file_data: FileProcess): logger.info(f"Processing file from Firebase Storage: {file_data.file_path}") try: # Get the file from Firebase Storage blob = bucket.blob(file_data.file_path) # Create a temporary file with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_data.file_path.split('.')[-1]}") as tmp_file: blob.download_to_filename(tmp_file.name) tmp_file_path = Path(tmp_file.name) logger.info(f"File downloaded temporarily at: {tmp_file_path}") file_type = file_data.file_path.split('.')[-1].lower() result = None try: if file_type in ['jpg', 'jpeg', 'png', 'bmp']: output_image = process_image(tmp_file_path) result = {"type": "image", "data": output_image} elif file_type in ['mp4', 'avi', 'mov', 'wmv']: video_output = process_video(str(tmp_file_path)) result = {"type": "video", "data": video_output} else: raise HTTPException(status_code=400, detail="Unsupported file type") logger.info(f"Processing complete. Result: {result}") # Store result in Firebase try: doc_ref = db.collection('results').add(result) return {"message": "File processed successfully", "result": result} except Exception as e: logger.error(f"Failed to store result in Firebase: {str(e)}") return {"message": "File processed successfully, but failed to store in Firebase", "result": result, "error": str(e)} finally: # Clean up the temporary file after processing if tmp_file_path.exists(): tmp_file_path.unlink() except Exception as e: logger.error(f"Error processing file: {str(e)}") raise HTTPException(status_code=500, detail=f"Error processing file: {str(e)}") def process_video(video_path, output_video_path='output_video.mp4', frame_sample_rate=5): cap = cv2.VideoCapture(video_path) # Check if video opened successfully if not cap.isOpened(): logger.error("Error: Could not open video.") return None fps = int(cap.get(cv2.CAP_PROP_FPS)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Define the codec and create VideoWriter object fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) output = {} frame_index = 0 # Create a progress bar with tqdm(total=total_frames, desc="Processing video") as pbar: while True: ret, frame = cap.read() if not ret: logger.info("End of video or cannot capture the frame.") break if frame_index % frame_sample_rate == 0: # Only analyze every nth frame try: result = DeepFace.analyze(frame, actions=['emotion'], detector_backend='retinaface',enforce_detection=False) except Exception as e: logger.error(f"Error analyzing frame {frame_index}: {e}") output[frame_index] = {} out.write(frame) # Write the original frame frame_index += 1 pbar.update(1) continue # Skip to the next frame tmp = {} for face in result: x, y, w, h = face['region']['x'], face['region']['y'], face['region']['w'], face['region']['h'] emotion = face['dominant_emotion'] emotion_scores = face['emotion'] tmp[(x, y, w, h)] = {'emotion': emotion, 'score': emotion_scores[emotion]} cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 2) cv2.putText(frame, f"{emotion} ({emotion_scores[emotion]:.2f})", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2) output[frame_index] = tmp out.write(frame) # Write the processed frame frame_index += 1 pbar.update(1) # Update progress bar # Release resources cap.release() out.release() # Save the results to a file with open('results_video.txt', 'w') as file: for frame_num, faces_info in output.items(): file.write(f"Frame {frame_num} ") for face_key, info in faces_info.items(): file.write(f"{face_key}: {info}\n") logger.info(f"Processed {frame_index} frames.") video_json_output = calculate_emotion_percentages('results_video.txt') print(video_json_output) return video_json_output def process_image(image_path): image = cv2.imread(image_path) if image is None: print(f"Error: Unable to load image from path {image_path}") return try: # Analyze the image for face detection and emotion analysis result = DeepFace.analyze(image_path, actions=['emotion'], detector_backend='retinaface',enforce_detection=False) except Exception as e: print(f"Error analyzing image: {e}") return image if len(result) == 0: print("No faces detected.") return image # Return the original image if no faces are detected output = {} tmp = {} for i, face in enumerate(result): # Get bounding box coordinates for each detected face x, y, w, h = face['region']['x'], face['region']['y'], face['region']['w'], face['region']['h'] # Extract emotion data emotion = face['dominant_emotion'] emotion_scores = face['emotion'] tmp[i] = {'person':i+1,'emotion': emotion, 'score': f"{emotion_scores[emotion]:.3f}"} # Draw rectangle around face and label with predicted emotion cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2) cv2.putText(image, f"{emotion} ({emotion_scores[emotion]:.3f})", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8,(0, 255, 0), 2) output['output'] = tmp # Save the processed image with bounding boxes and labels output_image_path = 'output_image_with_emotions.jpg' cv2.imwrite(output_image_path, image) print(f"Processed image saved as {output_image_path}") string_image_output = json.dumps(output['output']) return string_image_output def calculate_emotion_percentages(file_path): emotions = {} total_frames = 0 with open(file_path, 'r') as file: for line in file: if "{'emotion':" in line: total_frames += 1 emotion = line.split("'emotion': ")[1].split("'")[1] emotions[emotion] = emotions.get(emotion, 0) + 1 emotion_percentages = [ {"emotion": emotion, "percentage": (count / total_frames) * 100} for emotion, count in emotions.items() ] return emotion_percentages if __name__ == "__main__": logger.info("Starting the Face Emotion Recognition API") uvicorn.run(app, host="0.0.0.0", port=7860)