Spaces:
Running
Running
import matplotlib | |
matplotlib.use('Agg') | |
import streamlit as st | |
import cv2 | |
import numpy as np | |
from yolov5 import YOLOv5 | |
from sort.sort import Sort | |
import tempfile | |
import shutil | |
from moviepy.editor import VideoFileClip, concatenate_videoclips, ImageSequenceClip | |
import os | |
# Load the pre-trained model and initialize the SORT tracker | |
model_path = 'yolov5s.pt' # Ensure this path points to the model file | |
model = YOLOv5(model_path, device='cpu') | |
tracker = Sort() | |
def process_video(uploaded_file): | |
# Save the uploaded file to a temporary location | |
temp_file_path = "temp_video.mp4" | |
with open(temp_file_path, "wb") as temp_file: | |
temp_file.write(uploaded_file.getbuffer()) | |
# Use moviepy to read the video file | |
video_clip = VideoFileClip(temp_file_path) | |
total_frames = int(video_clip.fps * video_clip.duration) | |
width, height = video_clip.size | |
# Temporary directory to save processed video frames | |
temp_dir = tempfile.mkdtemp() | |
unique_cars = set() | |
progress_bar = st.progress(0) | |
for frame_idx, frame in enumerate(video_clip.iter_frames()): | |
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) | |
progress_percentage = int((frame_idx + 1) / total_frames * 100) | |
progress_bar.progress(progress_percentage) | |
# Detection and tracking logic | |
results = model.predict(frame) | |
preds = results.pandas().xyxy[0] | |
detections = [] | |
for index, row in preds.iterrows(): | |
if row['name'] == 'car': | |
xmin, ymin, xmax, ymax, conf = row['xmin'], row['ymin'], row['xmax'], row['ymax'], row['confidence'] | |
detections.append([xmin, ymin, xmax, ymax, conf]) | |
if detections: | |
detections_np = np.array(detections) | |
trackers = tracker.update(detections_np) | |
for d in trackers: | |
unique_cars.add(int(d[4])) | |
xmin, ymin, xmax, ymax = map(int, d[:4]) | |
cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (255, 0, 0), 2) | |
cv2.putText(frame, f'ID: {int(d[4])}', (xmin, ymin - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 0), 2) | |
cv2.putText(frame, f'Unique Cars: {len(unique_cars)}', (10, 35), cv2.FONT_HERSHEY_SIMPLEX, 1.25, (0, 255, 0), 2) | |
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Convert back to RGB for moviepy | |
cv2.imwrite(f"{temp_dir}/{frame_idx:04d}.jpg", frame) | |
frames_files = [os.path.join(temp_dir, f"{i:04d}.jpg") for i in range(total_frames)] | |
clip = ImageSequenceClip(frames_files, fps=video_clip.fps) | |
output_video_path = 'processed_video.mp4' | |
clip.write_videofile(output_video_path, codec='libx264') # Use libx264 codec for compatibility | |
# Remove temporary directory and temporary files | |
shutil.rmtree(temp_dir) | |
return output_video_path | |
def main(): | |
# Initialize session state variables if they don't exist | |
if 'output_video_path' not in st.session_state: | |
st.session_state.output_video_path = None | |
st.sidebar.image("logo.jpg", use_column_width=True) | |
uploaded_file = st.sidebar.file_uploader("Upload a video", type=['mp4']) | |
st.title("Car Detection and Tracking") | |
if uploaded_file is not None: | |
# Process the video only if it hasn't been processed yet or a new file is uploaded | |
if st.session_state.output_video_path is None or st.session_state.uploaded_file_name != uploaded_file.name: | |
st.session_state.uploaded_file_name = uploaded_file.name | |
st.session_state.output_video_path = process_video(uploaded_file) | |
# Display the processed video | |
st.video(st.session_state.output_video_path) | |
# Provide a download link for the processed video | |
with open(st.session_state.output_video_path, "rb") as file: | |
st.download_button("Download Processed Video", file, file_name="processed_video.mp4") | |
if __name__ == "__main__": | |
main() | |