File size: 8,471 Bytes
026cee8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10e6ed7
026cee8
 
10e6ed7
026cee8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
import json
import cv2
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
import logging
import tempfile
from pathlib import Path
import firebase_admin
from firebase_admin import credentials, firestore, storage
from pydantic import BaseModel
from deepface import DeepFace
from tqdm import tqdm

# Set up logging
logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Initialize Firebase
try:
    cred = credentials.Certificate("serviceAccountKey.json")
    firebase_app = firebase_admin.initialize_app(cred, {
        'storageBucket': 'future-forge-60d3f.appspot.com'
    })
    db = firestore.client()
    bucket = storage.bucket(app=firebase_app)
    logger.info("Firebase initialized successfully")
except Exception as e:
    logger.error(f"Failed to initialize Firebase: {str(e)}")

app = FastAPI()
# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Define the input model
class FileProcess(BaseModel):
    file_path: str

@app.post("/facial-emotion")
async def process_file(file_data: FileProcess):
    logger.info(f"Processing file from Firebase Storage: {file_data.file_path}")
    try:
        # Get the file from Firebase Storage
        blob = bucket.blob(file_data.file_path)
        # Create a temporary file
        with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_data.file_path.split('.')[-1]}") as tmp_file:
            blob.download_to_filename(tmp_file.name)
            tmp_file_path = Path(tmp_file.name)
        logger.info(f"File downloaded temporarily at: {tmp_file_path}")

        file_type = file_data.file_path.split('.')[-1].lower()

        result = None

        try:
            if file_type in ['jpg', 'jpeg', 'png', 'bmp']:
                output_image = process_image(tmp_file_path)
                result = {"type": "image", "data": output_image}
            elif file_type in ['mp4', 'avi', 'mov', 'wmv']:
                video_output = process_video(str(tmp_file_path))
                result = {"type": "video", "data": video_output}
            else:
                raise HTTPException(status_code=400, detail="Unsupported file type")

            logger.info(f"Processing complete. Result: {result}")

            # Store result in Firebase
            try:
                doc_ref = db.collection('results').add(result)
                return {"message": "File processed successfully", "result": result}
            except Exception as e:
                logger.error(f"Failed to store result in Firebase: {str(e)}")
                return {"message": "File processed successfully, but failed to store in Firebase", "result": result,
                        "error": str(e)}
        finally:
            # Clean up the temporary file after processing
            if tmp_file_path.exists():
                tmp_file_path.unlink()
    except Exception as e:
        logger.error(f"Error processing file: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Error processing file: {str(e)}")

def process_video(video_path, output_video_path='output_video.mp4', frame_sample_rate=5):
    cap = cv2.VideoCapture(video_path)
    # Check if video opened successfully
    if not cap.isOpened():
        logger.error("Error: Could not open video.")
        return None
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    # Define the codec and create VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
    output = {}
    frame_index = 0
    # Create a progress bar
    with tqdm(total=total_frames, desc="Processing video") as pbar:
        while True:
            ret, frame = cap.read()
            if not ret:
                logger.info("End of video or cannot capture the frame.")
                break

            if frame_index % frame_sample_rate == 0:  # Only analyze every nth frame
                try:
                    result = DeepFace.analyze(frame, actions=['emotion'], detector_backend='retinaface',enforce_detection=False)
                except Exception as e:
                    logger.error(f"Error analyzing frame {frame_index}: {e}")
                    output[frame_index] = {}
                    out.write(frame)  # Write the original frame
                    frame_index += 1
                    pbar.update(1)
                    continue  # Skip to the next frame
                tmp = {}
                for face in result:
                    x, y, w, h = face['region']['x'], face['region']['y'], face['region']['w'], face['region']['h']
                    emotion = face['dominant_emotion']
                    emotion_scores = face['emotion']
                    tmp[(x, y, w, h)] = {'emotion': emotion, 'score': emotion_scores[emotion]}

                    cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 2)
                    cv2.putText(frame, f"{emotion} ({emotion_scores[emotion]:.2f})", (x, y - 10),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2)
            output[frame_index] = tmp
            out.write(frame)  # Write the processed frame
            frame_index += 1
            pbar.update(1)  # Update progress bar
    # Release resources
    cap.release()
    out.release()
    # Save the results to a file
    with open('results_video.txt', 'w') as file:
        for frame_num, faces_info in output.items():
            file.write(f"Frame {frame_num} ")
            for face_key, info in faces_info.items():
                file.write(f"{face_key}: {info}\n")

    logger.info(f"Processed {frame_index} frames.")
    video_json_output = calculate_emotion_percentages('results_video.txt')
    print(video_json_output)
    return video_json_output

def process_image(image_path):
    image = cv2.imread(image_path)
    if image is None:
        print(f"Error: Unable to load image from path {image_path}")
        return
    try:
        # Analyze the image for face detection and emotion analysis
        result = DeepFace.analyze(image_path, actions=['emotion'], detector_backend='retinaface',enforce_detection=False)
    except Exception as e:
        print(f"Error analyzing image: {e}")
        return image
    
    if len(result) == 0:
        print("No faces detected.")
        return image  # Return the original image if no faces are detected
    
    output = {}
    tmp = {}
    for i, face in enumerate(result):
        # Get bounding box coordinates for each detected face
        x, y, w, h = face['region']['x'], face['region']['y'], face['region']['w'], face['region']['h']
        # Extract emotion data
        emotion = face['dominant_emotion']
        emotion_scores = face['emotion']
        tmp[i] = {'person':i+1,'emotion': emotion, 'score': f"{emotion_scores[emotion]:.3f}"}

        # Draw rectangle around face and label with predicted emotion
        cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2)
        cv2.putText(image, f"{emotion} ({emotion_scores[emotion]:.3f})", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8,(0, 255, 0), 2)
    output['output'] = tmp
    # Save the processed image with bounding boxes and labels
    output_image_path = 'output_image_with_emotions.jpg'
    cv2.imwrite(output_image_path, image)
    print(f"Processed image saved as {output_image_path}")
    string_image_output = json.dumps(output['output'])
    return string_image_output

def calculate_emotion_percentages(file_path):
    emotions = {}
    total_frames = 0
    with open(file_path, 'r') as file:
        for line in file:
            if "{'emotion':" in line:
                total_frames += 1
                emotion = line.split("'emotion': ")[1].split("'")[1]
                emotions[emotion] = emotions.get(emotion, 0) + 1

    emotion_percentages = [
        {"emotion": emotion, "percentage": (count / total_frames) * 100}
        for emotion, count in emotions.items()
    ]
    return emotion_percentages

if __name__ == "__main__":
    logger.info("Starting the Face Emotion Recognition API")
    uvicorn.run(app, host="0.0.0.0", port=7860)