Spaces:
Sleeping
Sleeping
#import semua library yang dibutuhkan | |
import os | |
import io | |
import cv2 | |
import numpy as np | |
from PIL import Image, ImageDraw | |
from transformers import AutoImageProcessor, AutoModelForObjectDetection | |
import streamlit as st | |
import torch | |
import time | |
import pandas as pd | |
# Setting page layout | |
st.set_page_config( | |
page_title="YoloS Helmet Detection", | |
page_icon="🤗", | |
layout="wide", | |
initial_sidebar_state="expanded" | |
) | |
def input_image_setup(uploaded_file): | |
if uploaded_file is not None: | |
bytes_data = uploaded_file.getvalue() | |
image = Image.open(io.BytesIO(bytes_data)) # Convert bytes data to PIL image | |
return image | |
else: | |
raise FileNotFoundError("No file uploaded") | |
# Function to convert OpenCV image to PIL image | |
def cv2_to_pil(image): | |
return Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) | |
def draw_bounding_boxes(image, results, model, confidence): | |
draw = ImageDraw.Draw(image) | |
for score, label, box in zip(results["scores"], results["labels"], results["boxes"]): | |
if score.item() >= confidence: | |
box = [int(i) for i in box.tolist()] | |
draw.rectangle(box, outline="purple", width=2) | |
label_text = f"{model.config.id2label[label.item()]} ({round(score.item(), 2)})" | |
draw.text((box[0], box[1]), label_text, fill="white") | |
return image | |
def process_image(image, model, processor, confidence): | |
inputs = processor(images=image, return_tensors="pt") | |
outputs = model(**inputs) | |
target_sizes = torch.tensor([image.size[::-1]]) | |
results = processor.post_process_object_detection(outputs, threshold=confidence, target_sizes=target_sizes)[0] | |
return results | |
def detection_results_to_dict(results, model): | |
detection_dict = { | |
"objects": [] | |
} | |
for score, label, box in zip(results["scores"], results["labels"], results["boxes"]): | |
box = [round(i, 2) for i in box.tolist()] | |
detection_dict["objects"].append({ | |
"label": model.config.id2label[label.item()], | |
"confidence": round(score.item(), 3), | |
"box": box | |
}) | |
return detection_dict | |
def convert_dict_to_csv(detection_dict_list): | |
combined_results = [] | |
for detection_dict in detection_dict_list: | |
combined_results.extend(detection_dict["objects"]) | |
df = pd.DataFrame(combined_results) | |
return df.to_csv(index=False).encode('utf-8') | |
def clear_detection_results(): | |
st.session_state.detection_dict_list = [] | |
# Initialize session state to store detection results | |
if 'detection_dict_list' not in st.session_state: | |
st.session_state.detection_dict_list = [] | |
# Streamlit App Configuration | |
st.header("Helmet Rider Detection") | |
# Sidebar for Model Selection and Confidence Slider | |
st.sidebar.header("ML Model Config") | |
models = ["gghsgn/final200" ,"gghsgn/final100", "/gghsgn/final50"] | |
model_name = st.sidebar.selectbox("Select model", models) | |
confidence = st.sidebar.slider("Select Model Confidence", 25, 100, 40, step=5) / 100 | |
# Load Model and Processor | |
processor = AutoImageProcessor.from_pretrained(model_name) | |
model = AutoModelForObjectDetection.from_pretrained(model_name) | |
# Option to select Real-Time or Upload Image | |
mode = st.sidebar.selectbox("Select Input Mode", ["Upload Image", "Real-Time Webcam", "RTSP Video"]) | |
# Option if select Upload Image | |
if mode == "Upload Image": | |
uploaded_file = st.file_uploader("Choose an image...", type=["jpg", "jpeg", "png"]) | |
if uploaded_file is not None: | |
image = Image.open(uploaded_file) | |
st.image(image, caption="Uploaded Image.", use_column_width=True) | |
else: | |
image = None | |
submit = st.button("Detect Objects") | |
if submit and image is not None: | |
try: | |
image_data = input_image_setup(uploaded_file) | |
st.subheader("The response is..") | |
results = process_image(image, model, processor, confidence) | |
drawn_image = draw_bounding_boxes(image.copy(), results, model, confidence) | |
st.image(drawn_image, caption="Detected Objects", use_column_width=True) | |
st.subheader("List of Objects:") | |
for score, label, box in zip(results["scores"], results["labels"], results["boxes"]): | |
box = [round(i, 2) for i in box.tolist()] | |
st.write( | |
f"Detected :orange[{model.config.id2label[label.item()]}] with confidence " | |
f":green[{round(score.item(), 3)}] at location :violet[{box}]" | |
) | |
detected_objects = {model.config.id2label[label.item()]: 0 for label in results["labels"]} | |
for label in results["labels"]: | |
detected_objects[model.config.id2label[label.item()]] += 1 | |
for obj, count in detected_objects.items(): | |
st.write(f"Class :orange[{obj}] detected {count} time(s)") | |
detection_dict = detection_results_to_dict(results, model) | |
#st.write(detection_dict) | |
csv_data = convert_dict_to_csv([detection_dict]) | |
st.download_button( | |
label="Download Results as CSV", | |
data=csv_data, | |
file_name="detection_results.csv", | |
mime="text/csv" | |
) | |
except Exception as e: | |
st.error(f"Error: {e}") | |
elif submit and image is None: | |
st.error("Please upload an image before trying to detect objects.") | |
# Option if select Realtime Webcam | |
elif mode == "Real-Time Webcam": | |
run = st.checkbox("Run Webcam") | |
FRAME_WINDOW = st.image([]) | |
if run: | |
cap = cv2.VideoCapture(0) | |
while run: | |
ret, frame = cap.read() | |
if not ret: | |
st.error("Failed to capture image from webcam") | |
break | |
frame_pil = cv2_to_pil(frame) | |
try: | |
results = process_image(frame_pil, model, processor, confidence) | |
drawn_image = draw_bounding_boxes(frame_pil.copy(), results, model, confidence) | |
FRAME_WINDOW.image(drawn_image, caption="Detected Objects", use_column_width=True) | |
st.subheader("List of Objects:") | |
for score, label, box in zip(results["scores"], results["labels"], results["boxes"]): | |
box = [round(i, 2) for i in box.tolist()] | |
st.write( | |
f"Detected :orange[{model.config.id2label[label.item()]}] with confidence " | |
f":green[{round(score.item(), 3)}] at location :violet[{box}]" | |
) | |
detected_objects = {model.config.id2label[label.item()]: 0 for label in results["labels"]} | |
for label in results["labels"]: | |
detected_objects[model.config.id2label[label.item()]] += 1 | |
for obj, count in detected_objects.items(): | |
st.write(f"Class :orange[{obj}] detected {count} time(s)") | |
detection_dict = detection_results_to_dict(results, model) | |
st.session_state.detection_dict_list.append(detection_dict) | |
#st.write(detection_dict) | |
except Exception as e: | |
st.error(f"Error: {e}") | |
time.sleep(0.1) # Delay for the next frame capture to create an illusion of real-time | |
cap.release() | |
if not run and st.session_state.detection_dict_list: | |
st.write("Detection stopped.") | |
csv_data = convert_dict_to_csv(st.session_state.detection_dict_list) | |
st.download_button( | |
label="Download All Results as CSV", | |
data=csv_data, | |
file_name="all_detection_results.csv", | |
mime="text/csv" | |
) | |
st.button("Clear Results", on_click=clear_detection_results) | |
# Option if select Realtime RTSP Video | |
elif mode == "RTSP Video": | |
rtsp_url = st.text_input("RTSP URL") | |
run = st.checkbox("Run RTSP Video") | |
FRAME_WINDOW = st.image([]) | |
if rtsp_url and run: | |
cap = cv2.VideoCapture(rtsp_url) | |
st.subheader("List of Objects:") | |
while run: | |
ret, frame = cap.read() | |
if not ret: | |
st.error("Failed to capture image from RTSP stream") | |
break | |
frame_pil = cv2_to_pil(frame) | |
try: | |
results = process_image(frame_pil, model, processor, confidence) | |
drawn_image = draw_bounding_boxes(frame_pil.copy(), results, model, confidence) | |
FRAME_WINDOW.image(drawn_image, caption="Detected Objects", use_column_width=True) | |
st.subheader("List of Objects:") | |
for score, label, box in zip(results["scores"], results["labels"], results["boxes"]): | |
box = [round(i, 2) for i in box.tolist()] | |
st.write( | |
f"Detected :orange[{model.config.id2label[label.item()]}] with confidence " | |
f":green[{round(score.item(), 3)}] at location :violet[{box}]" | |
) | |
detected_objects = {model.config.id2label[label.item()]: 0 for label in results["labels"]} | |
for label in results["labels"]: | |
detected_objects[model.config.id2label[label.item()]] += 1 | |
for obj, count in detected_objects.items(): | |
st.write(f"Class :orange[{obj}] detected {count} time(s)") | |
detection_dict = detection_results_to_dict(results, model) | |
st.session_state.detection_dict_list.append(detection_dict) | |
#st.write(detection_dict) | |
except Exception as e: | |
st.error(f"Error: {e}") | |
time.sleep(0.1) # Delay for the next frame capture to create an illusion of real-time | |
cap.release() | |
if not run and st.session_state.detection_dict_list: | |
st.write("Detection stopped.") | |
csv_data = convert_dict_to_csv(st.session_state.detection_dict_list) | |
st.download_button( | |
label="Download All Results as CSV", | |
data=csv_data, | |
file_name="all_detection_results.csv", | |
mime="text/csv" | |
) | |
st.button("Clear Results", on_click=clear_detection_results) | |
elif run: | |
st.error("Please provide a valid RTSP URL before running the stream.") | |
# Ensure the video capture object is released if the checkbox is unchecked | |
if 'cap' in locals(): | |
cap.release() | |