import gradio as gr import os import subprocess import cv2 import numpy as np from moviepy.editor import VideoFileClip, concatenate_videoclips import math from huggingface_hub import snapshot_download model_ids = [ 'runwayml/stable-diffusion-v1-5', 'lllyasviel/sd-controlnet-depth', 'lllyasviel/sd-controlnet-canny', 'lllyasviel/sd-controlnet-openpose', ] for model_id in model_ids: model_name = model_id.split('/')[-1] snapshot_download(model_id, local_dir=f'checkpoints/{model_name}') def get_frame_count(filepath): video = cv2.VideoCapture(filepath) frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) video.release() return gr.update(maximum=frame_count) def get_video_dimension(filepath): video = cv2.VideoCapture(filepath) width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = int(video.get(cv2.CAP_PROP_FPS)) frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) video.release() return width, height, fps, frame_count def resize_video(input_vid, output_vid, width, height, fps): print(f"RESIZING ...") # Open the input video file video = cv2.VideoCapture(input_vid) # Get the original video's width and height original_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH)) original_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Create a VideoWriter object to write the resized video fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for the output video output_video = cv2.VideoWriter(output_vid, fourcc, fps, (width, height)) while True: # Read a frame from the input video ret, frame = video.read() if not ret: break # Resize the frame to the desired dimensions resized_frame = cv2.resize(frame, (width, height)) # Write the resized frame to the output video file output_video.write(resized_frame) # Release the video objects video.release() output_video.release() print(f"RESIZE VIDEO DONE!") return output_vid def normalize_and_save_video(input_video_path, output_video_path): print(f"NORMALIZING ...") cap = cv2.VideoCapture(input_video_path) # Get video properties frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) # Create VideoWriter object to save the normalized video fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Specify the codec (e.g., 'mp4v', 'XVID', 'MPEG') out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) # Iterate through each frame in the video for _ in range(frame_count): ret, frame = cap.read() if not ret: break # Convert frame to floating point frame = frame.astype(np.float32) # Normalize pixel values to the range [0, 1] frame /= 255.0 # Convert normalized frame back to 8-bit unsigned integer frame = (frame * 255.0).astype(np.uint8) # Write the normalized frame to the output video file out.write(frame) # Release the VideoCapture and VideoWriter objects cap.release() out.release() print(f"NORMALIZE DONE!") return output_video_path def chunkify(video_path, fps, nb_frames): chunks_array = [] video_capture = cv2.VideoCapture(video_path) chunk_start_frame = 0 frames_per_chunk = 12 while chunk_start_frame < nb_frames: chunk_end_frame = min(chunk_start_frame + frames_per_chunk, nb_frames) video_capture.set(cv2.CAP_PROP_POS_FRAMES, chunk_start_frame) success, frame = video_capture.read() if not success: break chunk_name = f"chunk_{chunk_start_frame}-{chunk_end_frame}.mp4" chunk_video = cv2.VideoWriter(chunk_name, cv2.VideoWriter_fourcc(*"mp4v"), fps, (frame.shape[1], frame.shape[0])) for frame_number in range(chunk_start_frame, chunk_end_frame): video_capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number) success, frame = video_capture.read() if not success: break chunk_video.write(frame) chunk_video.release() chunks_array.append(chunk_name) chunk_start_frame += frames_per_chunk video_capture.release() print(f"CHUNKS: {chunks_array}") return chunks_array def run_inference_by_chunkify(prompt, video_path, condition, video_length): # DOESN'T WORK # Get FPS of original video input target_fps = get_video_dimension(video_path)[2] print(f"INPUT FPS: {target_fps}") # Count total frames according to fps total_frames = get_video_dimension(video_path)[3] # Resize the video resized = resize_video(video_path, 'resized.mp4', 512, 512, target_fps) # Chunkify the video into 12 frames chunks chunks = chunkify(resized, target_fps, total_frames) output_path = 'output/' os.makedirs(output_path, exist_ok=True) processed_chunks = [] for index, chunk_path in enumerate(chunks): if index == 0 : print(f"Chunk #{index}: {chunk_path}") # Check if the file already exists if os.path.exists(os.path.join(output_path, f"{index}.mp4")): # Delete the existing file os.remove(os.path.join(output_path, f"{index}.mp4")) #if video_length > 12: # command = f"python inference.py --prompt '{prompt}' --condition '{condition}' --video_path '{video_path}' --output_path '{output_path}' --width 512 --height 512 --fps 8 --video_length {video_length} --is_long_video" #else: command = f"python inference.py --prompt '{prompt}' --condition '{condition}' --video_path '{chunk_path}' --output_path '{output_path}' --temp_chunk_path '{index}' --width 512 --height 512 --fps 8 --video_length {video_length} --is_long_video" subprocess.run(command, shell=True) # Construct the video path video_path_output = os.path.join(output_path, f"{index}.mp4") # Append processed chunk to final array processed_chunks.append(video_path_output) else: print("finished") print(f"PROCESSED CHUNKS: {processed_chunks}") return "done", processed_chunks[0] def run_inference(prompt, video_path, condition, video_length): # Get FPS of original video input target_fps = get_video_dimension(video_path)[2] print(f"INPUT FPS: {target_fps}") # Count total frames according to fps total_frames = get_video_dimension(video_path)[3] # Resize the video resized = resize_video(video_path, 'resized.mp4', 512, 512, target_fps) # normalize pixels normalized = normalize_and_save_video(resized, 'normalized.mp4') output_path = 'output/' os.makedirs(output_path, exist_ok=True) # Check if the file already exists if os.path.exists(os.path.join(output_path, f"result.mp4")): # Delete the existing file os.remove(os.path.join(output_path, f"result.mp4")) print(f"RUNNING INFERENCE ...") if video_length > 12: command = f"python inference.py --prompt '{prompt}' --condition '{condition}' --video_path '{normalized}' --output_path '{output_path}' --temp_chunk_path 'result' --width 512 --height 512 --fps {target_fps} --video_length {video_length} --is_long_video" else: command = f"python inference.py --prompt '{prompt}' --condition '{condition}' --video_path '{normalized}' --output_path '{output_path}' --temp_chunk_path 'result' --width 512 --height 512 --fps {target_fps} --video_length {video_length}" subprocess.run(command, shell=True) # Construct the video path video_path_output = os.path.join(output_path, f"result.mp4") print(f"FINISHED !") return "done", video_path_output css=""" #col-container {max-width: 810px; margin-left: auto; margin-right: auto;} """ with gr.Blocks(css=css) as demo: with gr.Column(elem_id="col-container"): gr.Markdown("""

ControlVideo

""") with gr.Row(): with gr.Column(): #video_in = gr.Video(source="upload", type="filepath", visible=True) video_path = gr.Video(source="upload", type="filepath", visible=True) prompt = gr.Textbox(label="prompt") with gr.Row(): condition = gr.Dropdown(label="Condition", choices=["depth", "canny", "pose"], value="depth") video_length = gr.Slider(label="Video length", info="How many frames do you want to process ?", minimum=1, maximum=12, step=1, value=2) #seed = gr.Number(label="seed", value=42) submit_btn = gr.Button("Submit") with gr.Column(): video_res = gr.Video(label="result") status = gr.Textbox(label="result") video_path.change(fn=get_frame_count, inputs=[video_path], outputs=[video_length] ) submit_btn.click(fn=run_inference, inputs=[prompt, video_path, condition, video_length ], outputs=[status, video_res]) demo.queue(max_size=12).launch()