ControlVideo / app.py
fffiloni's picture
Update app.py
0b7a097
raw
history blame
9.58 kB
import gradio as gr
import os
import subprocess
import cv2
import numpy as np
from moviepy.editor import VideoFileClip, concatenate_videoclips
import math
from huggingface_hub import snapshot_download
model_ids = [
'runwayml/stable-diffusion-v1-5',
'lllyasviel/sd-controlnet-depth',
'lllyasviel/sd-controlnet-canny',
'lllyasviel/sd-controlnet-openpose',
]
for model_id in model_ids:
model_name = model_id.split('/')[-1]
snapshot_download(model_id, local_dir=f'checkpoints/{model_name}')
def get_frame_count(filepath):
video = cv2.VideoCapture(filepath)
frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
video.release()
return gr.update(maximum=frame_count)
def get_video_dimension(filepath):
video = cv2.VideoCapture(filepath)
width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(video.get(cv2.CAP_PROP_FPS))
frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
video.release()
return width, height, fps, frame_count
def resize_video(input_vid, output_vid, width, height, fps):
print(f"RESIZING ...")
# Open the input video file
video = cv2.VideoCapture(input_vid)
# Get the original video's width and height
original_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
original_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Create a VideoWriter object to write the resized video
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for the output video
output_video = cv2.VideoWriter(output_vid, fourcc, fps, (width, height))
while True:
# Read a frame from the input video
ret, frame = video.read()
if not ret:
break
# Resize the frame to the desired dimensions
resized_frame = cv2.resize(frame, (width, height))
# Write the resized frame to the output video file
output_video.write(resized_frame)
# Release the video objects
video.release()
output_video.release()
print(f"RESIZE VIDEO DONE!")
return output_vid
def normalize_and_save_video(input_video_path, output_video_path):
print(f"NORMALIZING ...")
cap = cv2.VideoCapture(input_video_path)
# Get video properties
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
# Create VideoWriter object to save the normalized video
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Specify the codec (e.g., 'mp4v', 'XVID', 'MPEG')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
# Iterate through each frame in the video
for _ in range(frame_count):
ret, frame = cap.read()
if not ret:
break
# Convert frame to floating point
frame = frame.astype(np.float32)
# Normalize pixel values to the range [0, 1]
frame /= 255.0
# Convert normalized frame back to 8-bit unsigned integer
frame = (frame * 255.0).astype(np.uint8)
# Write the normalized frame to the output video file
out.write(frame)
# Release the VideoCapture and VideoWriter objects
cap.release()
out.release()
print(f"NORMALIZE DONE!")
return output_video_path
def chunkify(video_path, fps, nb_frames):
chunks_array = []
video_capture = cv2.VideoCapture(video_path)
chunk_start_frame = 0
frames_per_chunk = 12
while chunk_start_frame < nb_frames:
chunk_end_frame = min(chunk_start_frame + frames_per_chunk, nb_frames)
video_capture.set(cv2.CAP_PROP_POS_FRAMES, chunk_start_frame)
success, frame = video_capture.read()
if not success:
break
chunk_name = f"chunk_{chunk_start_frame}-{chunk_end_frame}.mp4"
chunk_video = cv2.VideoWriter(chunk_name, cv2.VideoWriter_fourcc(*"mp4v"), fps, (frame.shape[1], frame.shape[0]))
for frame_number in range(chunk_start_frame, chunk_end_frame):
video_capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
success, frame = video_capture.read()
if not success:
break
chunk_video.write(frame)
chunk_video.release()
chunks_array.append(chunk_name)
chunk_start_frame += frames_per_chunk
video_capture.release()
print(f"CHUNKS: {chunks_array}")
return chunks_array
def run_inference_by_chunkify(prompt, video_path, condition, video_length):
# DOESN'T WORK
# Get FPS of original video input
target_fps = get_video_dimension(video_path)[2]
print(f"INPUT FPS: {target_fps}")
# Count total frames according to fps
total_frames = get_video_dimension(video_path)[3]
# Resize the video
resized = resize_video(video_path, 'resized.mp4', 512, 512, target_fps)
# Chunkify the video into 12 frames chunks
chunks = chunkify(resized, target_fps, total_frames)
output_path = 'output/'
os.makedirs(output_path, exist_ok=True)
processed_chunks = []
for index, chunk_path in enumerate(chunks):
if index == 0 :
print(f"Chunk #{index}: {chunk_path}")
# Check if the file already exists
if os.path.exists(os.path.join(output_path, f"{index}.mp4")):
# Delete the existing file
os.remove(os.path.join(output_path, f"{index}.mp4"))
#if video_length > 12:
# command = f"python inference.py --prompt '{prompt}' --condition '{condition}' --video_path '{video_path}' --output_path '{output_path}' --width 512 --height 512 --fps 8 --video_length {video_length} --is_long_video"
#else:
command = f"python inference.py --prompt '{prompt}' --condition '{condition}' --video_path '{chunk_path}' --output_path '{output_path}' --temp_chunk_path '{index}' --width 512 --height 512 --fps 8 --video_length {video_length} --is_long_video"
subprocess.run(command, shell=True)
# Construct the video path
video_path_output = os.path.join(output_path, f"{index}.mp4")
# Append processed chunk to final array
processed_chunks.append(video_path_output)
else:
print("finished")
print(f"PROCESSED CHUNKS: {processed_chunks}")
return "done", processed_chunks[0]
def run_inference(prompt, video_path, condition, video_length):
# Get FPS of original video input
target_fps = get_video_dimension(video_path)[2]
print(f"INPUT FPS: {target_fps}")
# Count total frames according to fps
total_frames = get_video_dimension(video_path)[3]
# Resize the video
resized = resize_video(video_path, 'resized.mp4', 512, 512, target_fps)
# normalize pixels
normalized = normalize_and_save_video(resized, 'normalized.mp4')
output_path = 'output/'
os.makedirs(output_path, exist_ok=True)
# Check if the file already exists
if os.path.exists(os.path.join(output_path, f"result.mp4")):
# Delete the existing file
os.remove(os.path.join(output_path, f"result.mp4"))
print(f"RUNNING INFERENCE ...")
if video_length > 12:
command = f"python inference.py --prompt '{prompt}' --condition '{condition}' --video_path '{normalized}' --output_path '{output_path}' --temp_chunk_path 'result' --width 512 --height 512 --fps {target_fps} --video_length {video_length} --is_long_video"
else:
command = f"python inference.py --prompt '{prompt}' --condition '{condition}' --video_path '{normalized}' --output_path '{output_path}' --temp_chunk_path 'result' --width 512 --height 512 --fps {target_fps} --video_length {video_length}"
subprocess.run(command, shell=True)
# Construct the video path
video_path_output = os.path.join(output_path, f"result.mp4")
print(f"FINISHED !")
return "done", video_path_output
css="""
#col-container {max-width: 810px; margin-left: auto; margin-right: auto;}
"""
with gr.Blocks(css=css) as demo:
with gr.Column(elem_id="col-container"):
gr.Markdown("""
<h1 style="text-align: center;">ControlVideo</h1>
""")
with gr.Row():
with gr.Column():
#video_in = gr.Video(source="upload", type="filepath", visible=True)
video_path = gr.Video(source="upload", type="filepath", visible=True)
prompt = gr.Textbox(label="prompt")
with gr.Row():
condition = gr.Dropdown(label="Condition", choices=["depth", "canny", "pose"], value="depth")
video_length = gr.Slider(label="Video length", info="How many frames do you want to process ?", minimum=1, maximum=12, step=1, value=2)
#seed = gr.Number(label="seed", value=42)
submit_btn = gr.Button("Submit")
with gr.Column():
video_res = gr.Video(label="result")
status = gr.Textbox(label="result")
video_path.change(fn=get_frame_count,
inputs=[video_path],
outputs=[video_length]
)
submit_btn.click(fn=run_inference,
inputs=[prompt,
video_path,
condition,
video_length
],
outputs=[status, video_res])
demo.queue(max_size=12).launch()