import matplotlib matplotlib.use('Agg') import streamlit as st import cv2 import numpy as np from yolov5 import YOLOv5 from sort.sort import Sort import tempfile import shutil from moviepy.editor import VideoFileClip, concatenate_videoclips, ImageSequenceClip import os # Load the pre-trained model and initialize the SORT tracker model_path = 'yolov5s.pt' # Ensure this path points to the model file model = YOLOv5(model_path, device='cpu') tracker = Sort() def process_video(uploaded_file): # Save the uploaded file to a temporary location temp_file_path = "temp_video.mp4" with open(temp_file_path, "wb") as temp_file: temp_file.write(uploaded_file.getbuffer()) # Use moviepy to read the video file video_clip = VideoFileClip(temp_file_path) total_frames = int(video_clip.fps * video_clip.duration) width, height = video_clip.size # Temporary directory to save processed video frames temp_dir = tempfile.mkdtemp() unique_cars = set() progress_bar = st.progress(0) for frame_idx, frame in enumerate(video_clip.iter_frames()): frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) progress_percentage = int((frame_idx + 1) / total_frames * 100) progress_bar.progress(progress_percentage) # Detection and tracking logic results = model.predict(frame) preds = results.pandas().xyxy[0] detections = [] for index, row in preds.iterrows(): if row['name'] == 'car': xmin, ymin, xmax, ymax, conf = row['xmin'], row['ymin'], row['xmax'], row['ymax'], row['confidence'] detections.append([xmin, ymin, xmax, ymax, conf]) if detections: detections_np = np.array(detections) trackers = tracker.update(detections_np) for d in trackers: unique_cars.add(int(d[4])) xmin, ymin, xmax, ymax = map(int, d[:4]) cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (255, 0, 0), 2) cv2.putText(frame, f'ID: {int(d[4])}', (xmin, ymin - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 0), 2) cv2.putText(frame, f'Unique Cars: {len(unique_cars)}', (10, 35), cv2.FONT_HERSHEY_SIMPLEX, 1.25, (0, 255, 0), 2) frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Convert back to RGB for moviepy cv2.imwrite(f"{temp_dir}/{frame_idx:04d}.jpg", frame) frames_files = [os.path.join(temp_dir, f"{i:04d}.jpg") for i in range(total_frames)] clip = ImageSequenceClip(frames_files, fps=video_clip.fps) output_video_path = 'processed_video.mp4' clip.write_videofile(output_video_path, codec='libx264') # Use libx264 codec for compatibility # Remove temporary directory and temporary files shutil.rmtree(temp_dir) return output_video_path def main(): # Initialize session state variables if they don't exist if 'output_video_path' not in st.session_state: st.session_state.output_video_path = None st.sidebar.image("logo.jpg", use_column_width=True) uploaded_file = st.sidebar.file_uploader("Upload a video", type=['mp4']) st.title("Car Detection and Tracking") if uploaded_file is not None: # Process the video only if it hasn't been processed yet or a new file is uploaded if st.session_state.output_video_path is None or st.session_state.uploaded_file_name != uploaded_file.name: st.session_state.uploaded_file_name = uploaded_file.name st.session_state.output_video_path = process_video(uploaded_file) # Display the processed video st.video(st.session_state.output_video_path) # Provide a download link for the processed video with open(st.session_state.output_video_path, "rb") as file: st.download_button("Download Processed Video", file, file_name="processed_video.mp4") if __name__ == "__main__": main()