Spaces:
Configuration error
Configuration error
from fastapi import FastAPI, Request, UploadFile, File,HTTPException | |
from fastapi.responses import HTMLResponse, JSONResponse | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.templating import Jinja2Templates | |
from ultralytics import YOLO | |
import aiofiles | |
import os | |
from pathlib import Path | |
import json | |
from jinja2 import Environment, FileSystemLoader | |
import uvicorn | |
from fastapi.middleware.cors import CORSMiddleware # Import CORS middleware | |
# add the fuzzy | |
import requests | |
#extract function from the video_processing path: | |
from video_processing.video_frames_opt import extract_frames_and_detect_objects, lik, lik_prediction | |
from video_processing.video_frames_new import extract_frames_and_detect_new_objects | |
import pandas as pd | |
import numpy as np | |
import cv2 | |
import logging | |
import os | |
from dotenv import load_dotenv | |
import boto3 | |
from dotenv import dotenv_values | |
# Configure logging to write to a file | |
logging.basicConfig(filename='uvicorn.log', level=logging.DEBUG) | |
# Create a JSON file with tracking results | |
# Define the Jinja2 environment | |
jinja_env = Environment(loader=FileSystemLoader("templates")) | |
# Set the environment variable | |
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" | |
# Initialize FastAPI | |
app = FastAPI() | |
# CORS policy | |
origins = ["*"] # Allow requests from any origin | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=origins, | |
allow_credentials=True, | |
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], | |
allow_headers=["*"], | |
) | |
# Serve static files | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
# Templates directory | |
templates = Jinja2Templates(directory="templates") | |
# Load the exported ONNX model | |
#onnx_model = YOLO('yolov8n.onnx') | |
# Directory to save uploaded videos | |
video_directory = Path("uploaded_videos") | |
# Directory to save the new videos: | |
video_directory_new = Path("uploaded_video_new") | |
# Chunk size for reading video files | |
CHUNK_SIZE = 1024 * 1024 # 1 MB | |
################################################################# | |
# define new function that upload the files from s3 bucket | |
# the part to load the env information | |
# Load environment variables from .env | |
import botocore | |
# Initialize the S3 client with anonymous access | |
s3 = boto3.client('s3', config=botocore.client.Config(signature_version=botocore.UNSIGNED)) | |
async def download_and_process_s3_file(s3_url: str): | |
try: | |
# Parse the S3 URL to extract bucket name and object key | |
bucket_name, object_key = s3_url.replace("s3://", "").split("/", 1) | |
# Construct the public URL for downloading the S3 object | |
public_url = f"https://{bucket_name}.s3.amazonaws.com/{object_key}" | |
response = requests.get(public_url) | |
file_name = object_key.split("/")[-1] # Extract the original filename | |
file_path = os.path.join("uploaded_video", file_name) | |
with open(file_path, "wb") as f: | |
f.write(response.content) | |
if file_path: | |
results = extract_frames_and_detect_objects(file_path) | |
json_data = lik(results) | |
processed_results = lik_prediction(json_data) | |
# Create a JSON file with tracking results | |
video_name = os.path.basename(file_path) | |
json_results_path = f"uploaded_videos/{video_name}.json" | |
with open(json_results_path, "w") as json_file: | |
json_file.write(processed_results) | |
return processed_results | |
else: | |
return {"message": "File download failed"} | |
except (requests.exceptions.RequestException, botocore.exceptions.ClientError) as e: | |
raise HTTPException(status_code=400, detail=f"Error processing video: {str(e)}") | |
# now get the access aws s3 bucket | |
AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY") | |
AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY") | |
AWS_S3_BUCKET_NAME = os.environ.get("AWS_S3_BUCKET_NAME") | |
FOLDER_NAME = os.environ.get("FOLDER_NAME") | |
# print the data access loading | |
print(FOLDER_NAME) | |
print('_url_upload_video_s3_') | |
def download_file_from_s3(bucket_name, access_key, secret_key, folder_name, name_for_s3): | |
s3_client = boto3.client( | |
service_name='s3', | |
aws_access_key_id=access_key, | |
aws_secret_access_key=secret_key | |
) | |
try: | |
# Define the path where the downloaded file will be saved | |
file_path = f"uploaded_video/{name_for_s3}" | |
# Check if the file already exists | |
if os.path.exists(file_path): | |
return file_path | |
s3_source_path = f"{FOLDER_NAME}/{name_for_s3}" | |
s3_client.download_file(bucket_name, s3_source_path, file_path) | |
return file_path | |
except Exception as e: | |
print(f"Error downloading file from S3: {e}") | |
return None | |
# Define a new endpoint to download files from S3, process them, and save JSON tracking results with changed URL names | |
async def process_and_save_file(file_name: str): | |
video_file_path = download_file_from_s3(AWS_S3_BUCKET_NAME, AWS_ACCESS_KEY, AWS_SECRET_KEY, FOLDER_NAME, file_name) | |
if video_file_path: | |
results = extract_frames_and_detect_objects(video_file_path) | |
json_data = lik(results) | |
processed_results = lik_prediction(json_data) | |
# Create a JSON file with tracking results | |
video_name = os.path.basename(video_file_path) | |
json_results_path = f"uploaded_videos/{video_name}.json" | |
with open(json_results_path, "w") as json_file: | |
json_file.write(processed_results) | |
return processed_results | |
else: | |
return {"message": "File download failed"} | |
################################################################# | |
async def read_root(request: Request): | |
return templates.TemplateResponse("index.html", {"request": request}) | |
async def upload_image_form(request: Request): | |
return templates.TemplateResponse("upload_image.html", {"request": request}) | |
async def upload_video(video: UploadFile = File(...)): | |
# Ensure the directory exists, create it if it doesn't | |
video_directory.mkdir(parents=True, exist_ok=True) | |
# Define the path where the video will be saved | |
video_path = video_directory / video.filename | |
# Open the video file in binary write mode asynchronously and write the video data | |
async with aiofiles.open(video_path, "wb") as buffer: | |
while True: | |
# Read the video data in chunks asynchronously | |
chunk = await video.read(CHUNK_SIZE) | |
if not chunk: | |
break | |
# Write the chunk to the file | |
await buffer.write(chunk) | |
results = extract_frames_and_detect_objects(str(video_path)) | |
###################################################### | |
json_date = lik(results) | |
##################################################### | |
# Process the prediction | |
processed_results = lik_prediction(json_date) | |
###################################################### | |
# Create a JSON file with tracking results | |
json_results_path = video_directory / f"{video.filename}.json" | |
open_path = str(json_results_path) | |
with open(open_path, "w") as json_file: | |
json_file.write(processed_results) | |
# Return the processed JSON data | |
return processed_results | |
# upload video and see the results: | |
async def upload_video_new(video: UploadFile = File(...)): | |
# Ensure the directory exists, create it if it doesn't | |
video_directory_new.mkdir(parents=True, exist_ok=True) | |
# Define the path where the video will be saved | |
video_path = video_directory_new / video.filename | |
# Open the video file in binary write mode asynchronously and write the video data | |
async with aiofiles.open(video_path, "wb") as buffer: | |
while True: | |
# Read the video data in chunks asynchronously | |
chunk = await video.read(CHUNK_SIZE) | |
if not chunk: | |
break | |
# Write the chunk to the file | |
await buffer.write(chunk) | |
results = extract_frames_and_detect_new_objects(str(video_path)) | |
return results | |
if __name__ == "__main__": | |
uvicorn.run(app, host="0.0.0.0", port=8000) | |