vtesting93x / comfyui /comfyui_nodes.py
meepmoo's picture
Upload folder using huggingface_hub
208b0eb verified
"""Modified from https://github.com/kijai/ComfyUI-EasyAnimateWrapper/blob/main/nodes.py
"""
import gc
import json
import os
import comfy.model_management as mm
import cv2
import folder_paths
import numpy as np
import torch
from comfy.utils import ProgressBar, load_torch_file
from diffusers import (AutoencoderKL, CogVideoXDDIMScheduler, DDIMScheduler,
DPMSolverMultistepScheduler,
EulerAncestralDiscreteScheduler, EulerDiscreteScheduler,
PNDMScheduler)
from einops import rearrange
from omegaconf import OmegaConf
from PIL import Image
from transformers import T5EncoderModel, T5Tokenizer
from ..cogvideox.data.bucket_sampler import ASPECT_RATIO_512, get_closest_ratio
from ..cogvideox.models.autoencoder_magvit import AutoencoderKLCogVideoX
from ..cogvideox.models.transformer3d import CogVideoXTransformer3DModel
from ..cogvideox.pipeline.pipeline_cogvideox import CogVideoX_Fun_Pipeline
from ..cogvideox.pipeline.pipeline_cogvideox_control import \
CogVideoX_Fun_Pipeline_Control
from ..cogvideox.pipeline.pipeline_cogvideox_inpaint import (
CogVideoX_Fun_Pipeline_Inpaint)
from ..cogvideox.utils.lora_utils import merge_lora, unmerge_lora
from ..cogvideox.utils.utils import (get_image_to_video_latent,
get_video_to_video_latent,
save_videos_grid)
# Compatible with Alibaba EAS for quick launch
eas_cache_dir = '/stable-diffusion-cache/models'
# The directory of the cogvideoxfun
script_directory = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def tensor2pil(image):
return Image.fromarray(np.clip(255. * image.cpu().numpy(), 0, 255).astype(np.uint8))
def numpy2pil(image):
return Image.fromarray(np.clip(255. * image, 0, 255).astype(np.uint8))
def to_pil(image):
if isinstance(image, Image.Image):
return image
if isinstance(image, torch.Tensor):
return tensor2pil(image)
if isinstance(image, np.ndarray):
return numpy2pil(image)
raise ValueError(f"Cannot convert {type(image)} to PIL.Image")
class LoadCogVideoX_Fun_Model:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"model": (
[
'CogVideoX-Fun-2b-InP',
'CogVideoX-Fun-5b-InP',
'CogVideoX-Fun-V1.1-2b-InP',
'CogVideoX-Fun-V1.1-5b-InP',
'CogVideoX-Fun-V1.1-2b-Pose',
'CogVideoX-Fun-V1.1-5b-Pose',
],
{
"default": 'CogVideoX-Fun-V1.1-2b-InP',
}
),
"model_type": (
["Inpaint", "Control"],
{
"default": "Inpaint",
}
),
"low_gpu_memory_mode":(
[False, True],
{
"default": False,
}
),
"precision": (
['fp16', 'bf16'],
{
"default": 'fp16'
}
),
},
}
RETURN_TYPES = ("CogVideoXFUNSMODEL",)
RETURN_NAMES = ("cogvideoxfun_model",)
FUNCTION = "loadmodel"
CATEGORY = "CogVideoXFUNWrapper"
def loadmodel(self, low_gpu_memory_mode, model, model_type, precision):
# Init weight_dtype and device
device = mm.get_torch_device()
offload_device = mm.unet_offload_device()
weight_dtype = {"bf16": torch.bfloat16, "fp16": torch.float16, "fp32": torch.float32}[precision]
# Init processbar
pbar = ProgressBar(3)
# Detect model is existing or not
model_path = os.path.join(folder_paths.models_dir, "CogVideoX_Fun", model)
if not os.path.exists(model_path):
if os.path.exists(eas_cache_dir):
model_path = os.path.join(eas_cache_dir, 'CogVideoX_Fun', model)
else:
model_path = os.path.join(folder_paths.models_dir, "CogVideoX-Fun", model)
if not os.path.exists(model_path):
if os.path.exists(eas_cache_dir):
model_path = os.path.join(eas_cache_dir, 'CogVideoX_Fun', model)
else:
# Detect model is existing or not
print(f"Please download cogvideoxfun model to: {model_path}")
vae = AutoencoderKLCogVideoX.from_pretrained(
model_path,
subfolder="vae",
).to(weight_dtype)
# Update pbar
pbar.update(1)
# Load Sampler
print("Load Sampler.")
scheduler = EulerDiscreteScheduler.from_pretrained(model_path, subfolder='scheduler')
# Update pbar
pbar.update(1)
# Get Transformer
transformer = CogVideoXTransformer3DModel.from_pretrained_2d(
model_path,
subfolder="transformer",
).to(weight_dtype)
# Update pbar
pbar.update(1)
# Get pipeline
if model_type == "Inpaint":
if transformer.config.in_channels != vae.config.latent_channels:
pipeline = CogVideoX_Fun_Pipeline_Inpaint.from_pretrained(
model_path,
vae=vae,
transformer=transformer,
scheduler=scheduler,
torch_dtype=weight_dtype
)
else:
pipeline = CogVideoX_Fun_Pipeline.from_pretrained(
model_path,
vae=vae,
transformer=transformer,
scheduler=scheduler,
torch_dtype=weight_dtype
)
else:
pipeline = CogVideoX_Fun_Pipeline_Control.from_pretrained(
model_path,
vae=vae,
transformer=transformer,
scheduler=scheduler,
torch_dtype=weight_dtype
)
if low_gpu_memory_mode:
pipeline.enable_sequential_cpu_offload()
else:
pipeline.enable_model_cpu_offload()
cogvideoxfun_model = {
'pipeline': pipeline,
'dtype': weight_dtype,
'model_path': model_path,
'model_type': model_type,
'loras': [],
'strength_model': [],
}
return (cogvideoxfun_model,)
class LoadCogVideoX_Fun_Lora:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"cogvideoxfun_model": ("CogVideoXFUNSMODEL",),
"lora_name": (folder_paths.get_filename_list("loras"), {"default": None,}),
"strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01}),
}
}
RETURN_TYPES = ("CogVideoXFUNSMODEL",)
RETURN_NAMES = ("cogvideoxfun_model",)
FUNCTION = "load_lora"
CATEGORY = "CogVideoXFUNWrapper"
def load_lora(self, cogvideoxfun_model, lora_name, strength_model):
if lora_name is not None:
return (
{
'pipeline': cogvideoxfun_model["pipeline"],
'dtype': cogvideoxfun_model["dtype"],
'model_path': cogvideoxfun_model["model_path"],
'loras': cogvideoxfun_model.get("loras", []) + [folder_paths.get_full_path("loras", lora_name)],
'strength_model': cogvideoxfun_model.get("strength_model", []) + [strength_model],
},
)
else:
return (cogvideoxfun_model,)
class CogVideoX_FUN_TextBox:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"prompt": ("STRING", {"multiline": True, "default": "",}),
}
}
RETURN_TYPES = ("STRING_PROMPT",)
RETURN_NAMES =("prompt",)
FUNCTION = "process"
CATEGORY = "CogVideoXFUNWrapper"
def process(self, prompt):
return (prompt, )
class CogVideoX_Fun_I2VSampler:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"cogvideoxfun_model": (
"CogVideoXFUNSMODEL",
),
"prompt": (
"STRING_PROMPT",
),
"negative_prompt": (
"STRING_PROMPT",
),
"video_length": (
"INT", {"default": 49, "min": 5, "max": 49, "step": 4}
),
"base_resolution": (
[
512,
768,
960,
1024,
], {"default": 768}
),
"seed": (
"INT", {"default": 43, "min": 0, "max": 0xffffffffffffffff}
),
"steps": (
"INT", {"default": 50, "min": 1, "max": 200, "step": 1}
),
"cfg": (
"FLOAT", {"default": 6.0, "min": 1.0, "max": 20.0, "step": 0.01}
),
"scheduler": (
[
"Euler",
"Euler A",
"DPM++",
"PNDM",
"DDIM",
],
{
"default": 'DDIM'
}
)
},
"optional":{
"start_img": ("IMAGE",),
"end_img": ("IMAGE",),
},
}
RETURN_TYPES = ("IMAGE",)
RETURN_NAMES =("images",)
FUNCTION = "process"
CATEGORY = "CogVideoXFUNWrapper"
def process(self, cogvideoxfun_model, prompt, negative_prompt, video_length, base_resolution, seed, steps, cfg, scheduler, start_img=None, end_img=None):
device = mm.get_torch_device()
offload_device = mm.unet_offload_device()
mm.soft_empty_cache()
gc.collect()
start_img = [to_pil(_start_img) for _start_img in start_img] if start_img is not None else None
end_img = [to_pil(_end_img) for _end_img in end_img] if end_img is not None else None
# Count most suitable height and width
aspect_ratio_sample_size = {key : [x / 512 * base_resolution for x in ASPECT_RATIO_512[key]] for key in ASPECT_RATIO_512.keys()}
original_width, original_height = start_img[0].size if type(start_img) is list else Image.open(start_img).size
closest_size, closest_ratio = get_closest_ratio(original_height, original_width, ratios=aspect_ratio_sample_size)
height, width = [int(x / 16) * 16 for x in closest_size]
# Get Pipeline
pipeline = cogvideoxfun_model['pipeline']
model_path = cogvideoxfun_model['model_path']
# Load Sampler
if scheduler == "DPM++":
noise_scheduler = DPMSolverMultistepScheduler.from_pretrained(model_path, subfolder= 'scheduler')
elif scheduler == "Euler":
noise_scheduler = EulerDiscreteScheduler.from_pretrained(model_path, subfolder= 'scheduler')
elif scheduler == "Euler A":
noise_scheduler = EulerAncestralDiscreteScheduler.from_pretrained(model_path, subfolder= 'scheduler')
elif scheduler == "PNDM":
noise_scheduler = PNDMScheduler.from_pretrained(model_path, subfolder= 'scheduler')
elif scheduler == "DDIM":
noise_scheduler = DDIMScheduler.from_pretrained(model_path, subfolder= 'scheduler')
pipeline.scheduler = noise_scheduler
generator= torch.Generator(device).manual_seed(seed)
with torch.no_grad():
video_length = int((video_length - 1) // pipeline.vae.config.temporal_compression_ratio * pipeline.vae.config.temporal_compression_ratio) + 1 if video_length != 1 else 1
input_video, input_video_mask, clip_image = get_image_to_video_latent(start_img, end_img, video_length=video_length, sample_size=(height, width))
for _lora_path, _lora_weight in zip(cogvideoxfun_model.get("loras", []), cogvideoxfun_model.get("strength_model", [])):
pipeline = merge_lora(pipeline, _lora_path, _lora_weight)
sample = pipeline(
prompt,
num_frames = video_length,
negative_prompt = negative_prompt,
height = height,
width = width,
generator = generator,
guidance_scale = cfg,
num_inference_steps = steps,
video = input_video,
mask_video = input_video_mask,
comfyui_progressbar = True,
).videos
videos = rearrange(sample, "b c t h w -> (b t) h w c")
for _lora_path, _lora_weight in zip(cogvideoxfun_model.get("loras", []), cogvideoxfun_model.get("strength_model", [])):
pipeline = unmerge_lora(pipeline, _lora_path, _lora_weight)
return (videos,)
class CogVideoX_Fun_T2VSampler:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"cogvideoxfun_model": (
"CogVideoXFUNSMODEL",
),
"prompt": (
"STRING_PROMPT",
),
"negative_prompt": (
"STRING_PROMPT",
),
"video_length": (
"INT", {"default": 49, "min": 5, "max": 49, "step": 4}
),
"width": (
"INT", {"default": 1008, "min": 64, "max": 2048, "step": 16}
),
"height": (
"INT", {"default": 576, "min": 64, "max": 2048, "step": 16}
),
"is_image":(
[
False,
True
],
{
"default": False,
}
),
"seed": (
"INT", {"default": 43, "min": 0, "max": 0xffffffffffffffff}
),
"steps": (
"INT", {"default": 50, "min": 1, "max": 200, "step": 1}
),
"cfg": (
"FLOAT", {"default": 6.0, "min": 1.0, "max": 20.0, "step": 0.01}
),
"scheduler": (
[
"Euler",
"Euler A",
"DPM++",
"PNDM",
"DDIM",
],
{
"default": 'DDIM'
}
),
},
}
RETURN_TYPES = ("IMAGE",)
RETURN_NAMES =("images",)
FUNCTION = "process"
CATEGORY = "CogVideoXFUNWrapper"
def process(self, cogvideoxfun_model, prompt, negative_prompt, video_length, width, height, is_image, seed, steps, cfg, scheduler):
device = mm.get_torch_device()
offload_device = mm.unet_offload_device()
mm.soft_empty_cache()
gc.collect()
# Get Pipeline
pipeline = cogvideoxfun_model['pipeline']
model_path = cogvideoxfun_model['model_path']
# Load Sampler
if scheduler == "DPM++":
noise_scheduler = DPMSolverMultistepScheduler.from_pretrained(model_path, subfolder= 'scheduler')
elif scheduler == "Euler":
noise_scheduler = EulerDiscreteScheduler.from_pretrained(model_path, subfolder= 'scheduler')
elif scheduler == "Euler A":
noise_scheduler = EulerAncestralDiscreteScheduler.from_pretrained(model_path, subfolder= 'scheduler')
elif scheduler == "PNDM":
noise_scheduler = PNDMScheduler.from_pretrained(model_path, subfolder= 'scheduler')
elif scheduler == "DDIM":
noise_scheduler = DDIMScheduler.from_pretrained(model_path, subfolder= 'scheduler')
pipeline.scheduler = noise_scheduler
generator= torch.Generator(device).manual_seed(seed)
video_length = 1 if is_image else video_length
with torch.no_grad():
video_length = int((video_length - 1) // pipeline.vae.config.temporal_compression_ratio * pipeline.vae.config.temporal_compression_ratio) + 1 if video_length != 1 else 1
input_video, input_video_mask, clip_image = get_image_to_video_latent(None, None, video_length=video_length, sample_size=(height, width))
for _lora_path, _lora_weight in zip(cogvideoxfun_model.get("loras", []), cogvideoxfun_model.get("strength_model", [])):
pipeline = merge_lora(pipeline, _lora_path, _lora_weight)
sample = pipeline(
prompt,
num_frames = video_length,
negative_prompt = negative_prompt,
height = height,
width = width,
generator = generator,
guidance_scale = cfg,
num_inference_steps = steps,
video = input_video,
mask_video = input_video_mask,
comfyui_progressbar = True,
).videos
videos = rearrange(sample, "b c t h w -> (b t) h w c")
for _lora_path, _lora_weight in zip(cogvideoxfun_model.get("loras", []), cogvideoxfun_model.get("strength_model", [])):
pipeline = unmerge_lora(pipeline, _lora_path, _lora_weight)
return (videos,)
class CogVideoX_Fun_V2VSampler:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"cogvideoxfun_model": (
"CogVideoXFUNSMODEL",
),
"prompt": (
"STRING_PROMPT",
),
"negative_prompt": (
"STRING_PROMPT",
),
"video_length": (
"INT", {"default": 49, "min": 5, "max": 49, "step": 4}
),
"base_resolution": (
[
512,
768,
960,
1024,
], {"default": 768}
),
"seed": (
"INT", {"default": 43, "min": 0, "max": 0xffffffffffffffff}
),
"steps": (
"INT", {"default": 50, "min": 1, "max": 200, "step": 1}
),
"cfg": (
"FLOAT", {"default": 6.0, "min": 1.0, "max": 20.0, "step": 0.01}
),
"denoise_strength": (
"FLOAT", {"default": 0.70, "min": 0.05, "max": 1.00, "step": 0.01}
),
"scheduler": (
[
"Euler",
"Euler A",
"DPM++",
"PNDM",
"DDIM",
],
{
"default": 'DDIM'
}
),
},
"optional":{
"validation_video": ("IMAGE",),
"control_video": ("IMAGE",),
},
}
RETURN_TYPES = ("IMAGE",)
RETURN_NAMES =("images",)
FUNCTION = "process"
CATEGORY = "CogVideoXFUNWrapper"
def process(self, cogvideoxfun_model, prompt, negative_prompt, video_length, base_resolution, seed, steps, cfg, denoise_strength, scheduler, validation_video=None, control_video=None):
device = mm.get_torch_device()
offload_device = mm.unet_offload_device()
mm.soft_empty_cache()
gc.collect()
# Get Pipeline
pipeline = cogvideoxfun_model['pipeline']
model_path = cogvideoxfun_model['model_path']
model_type = cogvideoxfun_model['model_type']
# Count most suitable height and width
aspect_ratio_sample_size = {key : [x / 512 * base_resolution for x in ASPECT_RATIO_512[key]] for key in ASPECT_RATIO_512.keys()}
if model_type == "Inpaint":
if type(validation_video) is str:
original_width, original_height = Image.fromarray(cv2.VideoCapture(validation_video).read()[1]).size
else:
validation_video = np.array(validation_video.cpu().numpy() * 255, np.uint8)
original_width, original_height = Image.fromarray(validation_video[0]).size
else:
if type(control_video) is str:
original_width, original_height = Image.fromarray(cv2.VideoCapture(control_video).read()[1]).size
else:
control_video = np.array(control_video.cpu().numpy() * 255, np.uint8)
original_width, original_height = Image.fromarray(control_video[0]).size
closest_size, closest_ratio = get_closest_ratio(original_height, original_width, ratios=aspect_ratio_sample_size)
height, width = [int(x / 16) * 16 for x in closest_size]
# Load Sampler
if scheduler == "DPM++":
noise_scheduler = DPMSolverMultistepScheduler.from_pretrained(model_path, subfolder= 'scheduler')
elif scheduler == "Euler":
noise_scheduler = EulerDiscreteScheduler.from_pretrained(model_path, subfolder= 'scheduler')
elif scheduler == "Euler A":
noise_scheduler = EulerAncestralDiscreteScheduler.from_pretrained(model_path, subfolder= 'scheduler')
elif scheduler == "PNDM":
noise_scheduler = PNDMScheduler.from_pretrained(model_path, subfolder= 'scheduler')
elif scheduler == "DDIM":
noise_scheduler = DDIMScheduler.from_pretrained(model_path, subfolder= 'scheduler')
pipeline.scheduler = noise_scheduler
generator= torch.Generator(device).manual_seed(seed)
with torch.no_grad():
video_length = int((video_length - 1) // pipeline.vae.config.temporal_compression_ratio * pipeline.vae.config.temporal_compression_ratio) + 1 if video_length != 1 else 1
if model_type == "Inpaint":
input_video, input_video_mask, clip_image = get_video_to_video_latent(validation_video, video_length=video_length, sample_size=(height, width), fps=8)
else:
input_video, input_video_mask, clip_image = get_video_to_video_latent(control_video, video_length=video_length, sample_size=(height, width), fps=8)
for _lora_path, _lora_weight in zip(cogvideoxfun_model.get("loras", []), cogvideoxfun_model.get("strength_model", [])):
pipeline = merge_lora(pipeline, _lora_path, _lora_weight)
if model_type == "Inpaint":
sample = pipeline(
prompt,
num_frames = video_length,
negative_prompt = negative_prompt,
height = height,
width = width,
generator = generator,
guidance_scale = cfg,
num_inference_steps = steps,
video = input_video,
mask_video = input_video_mask,
strength = float(denoise_strength),
comfyui_progressbar = True,
).videos
else:
sample = pipeline(
prompt,
num_frames = video_length,
negative_prompt = negative_prompt,
height = height,
width = width,
generator = generator,
guidance_scale = cfg,
num_inference_steps = steps,
control_video = input_video,
comfyui_progressbar = True,
).videos
videos = rearrange(sample, "b c t h w -> (b t) h w c")
for _lora_path, _lora_weight in zip(cogvideoxfun_model.get("loras", []), cogvideoxfun_model.get("strength_model", [])):
pipeline = unmerge_lora(pipeline, _lora_path, _lora_weight)
return (videos,)
NODE_CLASS_MAPPINGS = {
"CogVideoX_FUN_TextBox": CogVideoX_FUN_TextBox,
"LoadCogVideoX_Fun_Model": LoadCogVideoX_Fun_Model,
"LoadCogVideoX_Fun_Lora": LoadCogVideoX_Fun_Lora,
"CogVideoX_Fun_I2VSampler": CogVideoX_Fun_I2VSampler,
"CogVideoX_Fun_T2VSampler": CogVideoX_Fun_T2VSampler,
"CogVideoX_Fun_V2VSampler": CogVideoX_Fun_V2VSampler,
}
NODE_DISPLAY_NAME_MAPPINGS = {
"CogVideoX_FUN_TextBox": "CogVideoX_FUN_TextBox",
"LoadCogVideoX_Fun_Model": "Load CogVideoX-Fun Model",
"LoadCogVideoX_Fun_Lora": "Load CogVideoX-Fun Lora",
"CogVideoX_Fun_I2VSampler": "CogVideoX-Fun Sampler for Image to Video",
"CogVideoX_Fun_T2VSampler": "CogVideoX-Fun Sampler for Text to Video",
"CogVideoX_Fun_V2VSampler": "CogVideoX-Fun Sampler for Video to Video",
}