import gradio as gr import cv2 import numpy as np import tensorflow as tf from googleapiclient.discovery import build from googleapiclient.http import MediaIoBaseDownload import io import plotly.graph_objects as go from PIL import Image import pushbullet file = 'api.txt' with open(file , mode = 'r') as f: text = f.read() # Constants API_KEY = text # Replace with your API key FOLDER_ID = "1NV0hemxLCtd04NpURF0gkV0ZN29_IiZ5" # Replace with your folder ID # Load the trained model and class labels model = tf.keras.models.load_model('best_model123.h5') with open('classes.txt', 'r') as f: class_labels = f.read().splitlines() bg_subtractor = cv2.createBackgroundSubtractorMOG2() def get_latest_image_from_drive(): service = build('drive', 'v3', developerKey=API_KEY) query = f"'{FOLDER_ID}' in parents and mimeType='image/jpeg'" results = service.files().list(q=query, orderBy='modifiedTime desc', pageSize=1).execute() items = results.get('files', []) if not items: print("No files found.") return None else: file_id = items[0]['id'] request = service.files().get_media(fileId=file_id) fh = io.BytesIO() downloader = MediaIoBaseDownload(fh, request) done = False while not done: _, done = downloader.next_chunk() image_data = np.frombuffer(fh.getvalue(), dtype=np.uint8) return cv2.imdecode(image_data, cv2.IMREAD_UNCHANGED) def convert_numpy_array_to_pillow_image(img): img = np.uint8(img) if img.ndim == 3: if img.shape[2] == 4: img = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR) elif img.shape[2] == 3: img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) return Image.fromarray(img) def classify_frame(frame): frame = cv2.resize(frame, (64, 64)) img_array = np.array(frame) # Convert the frame to a NumPy array img_array = np.expand_dims(img_array, axis=0) img_array = tf.keras.applications.mobilenet_v2.preprocess_input(img_array) predictions = model.predict(img_array) top_labels_indices = np.argsort(predictions[0])[-4:][::-1] top_labels = [class_labels[i] for i in top_labels_indices] top_probabilities = [predictions[0][i] for i in top_labels_indices] return top_labels, top_probabilities def create_motion_detection_graph(frame): fg_mask = bg_subtractor.apply(frame) fg_mask = cv2.medianBlur(fg_mask, 5) fg_mask = cv2.dilate(fg_mask, None, iterations=2) contours, _ = cv2.findContours(fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) motion_detected_points = [] for contour in contours: if cv2.contourArea(contour) > 1000: x, y, w, h = cv2.boundingRect(contour) motion_detected_points.append((x, y)) fig = go.Figure() fig.add_trace(go.Scatter(x=motion_detected_points, y=[0] * len(motion_detected_points), mode='markers', marker_symbol='circle')) fig.update_layout(title='Motion Detection', xaxis_title='X Position', yaxis_title='Y Position') return fig def create_graph(top_labels, top_probabilities, threshold): fig = go.Figure() filtered_top_labels = [] filtered_top_probabilities = [] for i in range(len(top_labels)): if top_probabilities[i] >= threshold: filtered_top_labels.append(top_labels[i]) filtered_top_probabilities.append(top_probabilities[i]) fig.add_trace(go.Bar(x=filtered_top_labels, y=filtered_top_probabilities)) fig.update_layout(title='Threat Classification', xaxis_title='Class Label', yaxis_title='Probability (0-100%)') return fig def send_pushbullet_notification(title, body): api_key = "o.8ISYIpsOoRmU7jbLBVrgr7DRuFbYcgRc" pb = pushbullet.Pushbullet(api_key) pb.push_note(title, body) def process_image(threshold=0, last_image=None): img = get_latest_image_from_drive() if img is not None: # Classify the image top_labels, top_probabilities = classify_frame(img) # Create Threat Classification graph threat_classification_graph = create_graph(top_labels, top_probabilities, threshold) # Create Motion Detection graph motion_detection_graph = create_motion_detection_graph(img) # Draw bounding boxes on the image if last_image is not None: for label, probability in zip(top_labels, top_probabilities): if probability >= threshold: cv2.putText(last_image, f"{label}: {probability:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) cv2.rectangle(last_image, (10, 40), (300, 100), (0, 0, 255), -1) else: for label, probability in zip(top_labels, top_probabilities): if probability >= threshold: cv2.putText(img, f"{label}: {probability:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) cv2.rectangle(img, (10, 40), (300, 100), (0, 0, 255), -1) # Set the last image last_image = img # Send a Pushbullet notification top_label = top_labels[0] if top_labels else "Unknown" top_probability = top_probabilities[0] if top_probabilities else 0.0 send_pushbullet_notification(f"Threat Classification - {top_label}", f"Probability: {top_probability:.2f}") return threat_classification_graph, motion_detection_graph, last_image # Create the Gradio interface intf = gr.Interface(fn=process_image, inputs=gr.Slider(minimum=0.0, maximum=1, value=0.5, label="Threshold"), outputs=[gr.Plot(), gr.Plot(), gr.Image()], title='Threat Classification from Google Drive Image', theme=gr.themes.Default()) # Display the interface intf.launch(debug=True, share=True)