Spaces:
Running
on
Zero
Running
on
Zero
import torch | |
from . import a1111_compat | |
import comfy | |
from .libs import common | |
from comfy import model_management | |
from comfy.samplers import CFGGuider | |
from comfy_extras.nodes_perpneg import Guider_PerpNeg | |
import math | |
class KSampler_progress(a1111_compat.KSampler_inspire): | |
def INPUT_TYPES(s): | |
return {"required": { | |
"model": ("MODEL",), | |
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), | |
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}), | |
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}), | |
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, ), | |
"scheduler": (common.SCHEDULERS, ), | |
"positive": ("CONDITIONING", ), | |
"negative": ("CONDITIONING", ), | |
"latent_image": ("LATENT", ), | |
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}), | |
"noise_mode": (["GPU(=A1111)", "CPU"],), | |
"interval": ("INT", {"default": 1, "min": 1, "max": 10000}), | |
"omit_start_latent": ("BOOLEAN", {"default": True, "label_on": "True", "label_off": "False"}), | |
"omit_final_latent": ("BOOLEAN", {"default": False, "label_on": "True", "label_off": "False"}), | |
}, | |
"optional": { | |
"scheduler_func_opt": ("SCHEDULER_FUNC",), | |
} | |
} | |
CATEGORY = "InspirePack/analysis" | |
RETURN_TYPES = ("LATENT", "LATENT") | |
RETURN_NAMES = ("latent", "progress_latent") | |
def doit(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise, noise_mode, | |
interval, omit_start_latent, omit_final_latent, scheduler_func_opt=None): | |
adv_steps = int(steps / denoise) | |
if omit_start_latent: | |
result = [] | |
else: | |
result = [latent_image['samples']] | |
def progress_callback(step, x0, x, total_steps): | |
if (total_steps-1) != step and step % interval != 0: | |
return | |
x = model.model.process_latent_out(x) | |
x = x.cpu() | |
result.append(x) | |
latent_image, noise = a1111_compat.KSamplerAdvanced_inspire.sample(model, True, seed, adv_steps, cfg, sampler_name, scheduler, positive, negative, latent_image, (adv_steps-steps), | |
adv_steps, noise_mode, False, callback=progress_callback, scheduler_func_opt=scheduler_func_opt) | |
if not omit_final_latent: | |
result.append(latent_image['samples'].cpu()) | |
if len(result) > 0: | |
result = torch.cat(result) | |
result = {'samples': result} | |
else: | |
result = latent_image | |
return latent_image, result | |
class KSamplerAdvanced_progress(a1111_compat.KSamplerAdvanced_inspire): | |
def INPUT_TYPES(s): | |
return {"required": { | |
"model": ("MODEL",), | |
"add_noise": ("BOOLEAN", {"default": True, "label_on": "enable", "label_off": "disable"}), | |
"noise_seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), | |
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}), | |
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.5, "round": 0.01}), | |
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, ), | |
"scheduler": (common.SCHEDULERS, ), | |
"positive": ("CONDITIONING", ), | |
"negative": ("CONDITIONING", ), | |
"latent_image": ("LATENT", ), | |
"start_at_step": ("INT", {"default": 0, "min": 0, "max": 10000}), | |
"end_at_step": ("INT", {"default": 10000, "min": 0, "max": 10000}), | |
"noise_mode": (["GPU(=A1111)", "CPU"],), | |
"return_with_leftover_noise": ("BOOLEAN", {"default": False, "label_on": "enable", "label_off": "disable"}), | |
"interval": ("INT", {"default": 1, "min": 1, "max": 10000}), | |
"omit_start_latent": ("BOOLEAN", {"default": False, "label_on": "True", "label_off": "False"}), | |
"omit_final_latent": ("BOOLEAN", {"default": False, "label_on": "True", "label_off": "False"}), | |
}, | |
"optional": { | |
"prev_progress_latent_opt": ("LATENT",), | |
"scheduler_func_opt": ("SCHEDULER_FUNC",), | |
} | |
} | |
FUNCTION = "doit" | |
CATEGORY = "InspirePack/analysis" | |
RETURN_TYPES = ("LATENT", "LATENT") | |
RETURN_NAMES = ("latent", "progress_latent") | |
def doit(self, model, add_noise, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, | |
start_at_step, end_at_step, noise_mode, return_with_leftover_noise, interval, omit_start_latent, omit_final_latent, | |
prev_progress_latent_opt=None, scheduler_func_opt=None): | |
if omit_start_latent: | |
result = [] | |
else: | |
result = [latent_image['samples']] | |
def progress_callback(step, x0, x, total_steps): | |
if (total_steps-1) != step and step % interval != 0: | |
return | |
x = model.model.process_latent_out(x) | |
x = x.cpu() | |
result.append(x) | |
latent_image, noise = a1111_compat.KSamplerAdvanced_inspire.sample(model, add_noise, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, start_at_step, end_at_step, | |
noise_mode, False, callback=progress_callback, scheduler_func_opt=scheduler_func_opt) | |
if not omit_final_latent: | |
result.append(latent_image['samples'].cpu()) | |
if len(result) > 0: | |
result = torch.cat(result) | |
result = {'samples': result} | |
else: | |
result = latent_image | |
if prev_progress_latent_opt is not None: | |
result['samples'] = torch.cat((prev_progress_latent_opt['samples'], result['samples']), dim=0) | |
return latent_image, result | |
def exponential_interpolation(from_cfg, to_cfg, i, steps): | |
if i == steps-1: | |
return to_cfg | |
if from_cfg == to_cfg: | |
return from_cfg | |
if from_cfg == 0: | |
return to_cfg * (1 - math.exp(-5 * i / steps)) / (1 - math.exp(-5)) | |
elif to_cfg == 0: | |
return from_cfg * (math.exp(-5 * i / steps) - math.exp(-5)) / (1 - math.exp(-5)) | |
else: | |
log_from = math.log(from_cfg) | |
log_to = math.log(to_cfg) | |
log_value = log_from + (log_to - log_from) * i / steps | |
return math.exp(log_value) | |
def logarithmic_interpolation(from_cfg, to_cfg, i, steps): | |
if i == 0: | |
return from_cfg | |
if i == steps-1: | |
return to_cfg | |
log_i = math.log(i + 1) | |
log_steps = math.log(steps + 1) | |
t = log_i / log_steps | |
return from_cfg + (to_cfg - from_cfg) * t | |
def cosine_interpolation(from_cfg, to_cfg, i, steps): | |
if (i == 0) or (i == steps-1): | |
return from_cfg | |
t = (1.0 + math.cos(math.pi*2*(i/steps))) / 2 | |
return from_cfg + (to_cfg - from_cfg) * t | |
class Guider_scheduled(CFGGuider): | |
def __init__(self, model_patcher, sigmas, from_cfg, to_cfg, schedule): | |
super().__init__(model_patcher) | |
self.default_cfg = self.cfg | |
self.sigmas = sigmas | |
self.cfg_sigmas = None | |
self.cfg_sigmas_i = None | |
self.from_cfg = from_cfg | |
self.to_cfg = to_cfg | |
self.schedule = schedule | |
self.last_i = 0 | |
self.renew_cfg_sigmas() | |
def set_cfg(self, cfg): | |
self.default_cfg = cfg | |
self.renew_cfg_sigmas() | |
def renew_cfg_sigmas(self): | |
self.cfg_sigmas = {} | |
self.cfg_sigmas_i = {} | |
i = 0 | |
steps = len(self.sigmas) - 1 | |
for x in self.sigmas: | |
k = float(x) | |
delta = self.to_cfg - self.from_cfg | |
if self.schedule == 'exp': | |
self.cfg_sigmas[k] = exponential_interpolation(self.from_cfg, self.to_cfg, i, steps), i | |
elif self.schedule == 'log': | |
self.cfg_sigmas[k] = logarithmic_interpolation(self.from_cfg, self.to_cfg, i, steps), i | |
elif self.schedule == 'cos': | |
self.cfg_sigmas[k] = cosine_interpolation(self.from_cfg, self.to_cfg, i, steps), i | |
else: | |
self.cfg_sigmas[k] = self.from_cfg + delta * i / steps, i | |
self.cfg_sigmas_i[i] = self.cfg_sigmas[k] | |
i += 1 | |
def predict_noise(self, x, timestep, model_options={}, seed=None): | |
k = float(timestep[0]) | |
v = self.cfg_sigmas.get(k) | |
if v is None: | |
# fallback | |
v = self.cfg_sigmas_i[self.last_i+1] | |
self.cfg_sigmas[k] = v | |
self.last_i = v[1] | |
self.cfg = v[0] | |
return super().predict_noise(x, timestep, model_options, seed) | |
class Guider_PerpNeg_scheduled(Guider_PerpNeg): | |
def __init__(self, model_patcher, sigmas, from_cfg, to_cfg, schedule, neg_scale): | |
super().__init__(model_patcher) | |
self.default_cfg = self.cfg | |
self.sigmas = sigmas | |
self.cfg_sigmas = None | |
self.cfg_sigmas_i = None | |
self.from_cfg = from_cfg | |
self.to_cfg = to_cfg | |
self.schedule = schedule | |
self.neg_scale = neg_scale | |
self.last_i = 0 | |
self.renew_cfg_sigmas() | |
def set_cfg(self, cfg): | |
self.default_cfg = cfg | |
self.renew_cfg_sigmas() | |
def renew_cfg_sigmas(self): | |
self.cfg_sigmas = {} | |
self.cfg_sigmas_i = {} | |
i = 0 | |
steps = len(self.sigmas) - 1 | |
for x in self.sigmas: | |
k = float(x) | |
delta = self.to_cfg - self.from_cfg | |
if self.schedule == 'exp': | |
self.cfg_sigmas[k] = exponential_interpolation(self.from_cfg, self.to_cfg, i, steps), i | |
elif self.schedule == 'log': | |
self.cfg_sigmas[k] = logarithmic_interpolation(self.from_cfg, self.to_cfg, i, steps), i | |
elif self.schedule == 'cos': | |
self.cfg_sigmas[k] = cosine_interpolation(self.from_cfg, self.to_cfg, i, steps), i | |
else: | |
self.cfg_sigmas[k] = self.from_cfg + delta * i / steps, i | |
self.cfg_sigmas_i[i] = self.cfg_sigmas[k] | |
i += 1 | |
def predict_noise(self, x, timestep, model_options={}, seed=None): | |
k = float(timestep[0]) | |
v = self.cfg_sigmas.get(k) | |
if v is None: | |
# fallback | |
v = self.cfg_sigmas_i[self.last_i+1] | |
self.cfg_sigmas[k] = v | |
self.last_i = v[1] | |
self.cfg = v[0] | |
return super().predict_noise(x, timestep, model_options, seed) | |
class ScheduledCFGGuider: | |
def INPUT_TYPES(s): | |
return {"required": { | |
"model": ("MODEL", ), | |
"positive": ("CONDITIONING", ), | |
"negative": ("CONDITIONING", ), | |
"sigmas": ("SIGMAS", ), | |
"from_cfg": ("FLOAT", {"default": 6.5, "min": 0.0, "max": 100.0, "step": 0.1, "round": 0.01}), | |
"to_cfg": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 100.0, "step": 0.1, "round": 0.01}), | |
"schedule": (["linear", "log", "exp", "cos"], {'default': 'log'}) | |
} | |
} | |
RETURN_TYPES = ("GUIDER", "SIGMAS") | |
FUNCTION = "get_guider" | |
CATEGORY = "sampling/custom_sampling/guiders" | |
def get_guider(self, model, positive, negative, sigmas, from_cfg, to_cfg, schedule): | |
guider = Guider_scheduled(model, sigmas, from_cfg, to_cfg, schedule) | |
guider.set_conds(positive, negative) | |
return guider, sigmas | |
class ScheduledPerpNegCFGGuider: | |
def INPUT_TYPES(s): | |
return {"required": { | |
"model": ("MODEL", ), | |
"positive": ("CONDITIONING", ), | |
"negative": ("CONDITIONING", ), | |
"empty_conditioning": ("CONDITIONING", ), | |
"neg_scale": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 100.0, "step": 0.01}), | |
"sigmas": ("SIGMAS", ), | |
"from_cfg": ("FLOAT", {"default": 6.5, "min": 0.0, "max": 100.0, "step": 0.1, "round": 0.01}), | |
"to_cfg": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 100.0, "step": 0.1, "round": 0.01}), | |
"schedule": (["linear", "log", "exp", "cos"], {'default': 'log'}) | |
} | |
} | |
RETURN_TYPES = ("GUIDER", "SIGMAS") | |
FUNCTION = "get_guider" | |
CATEGORY = "sampling/custom_sampling/guiders" | |
def get_guider(self, model, positive, negative, empty_conditioning, neg_scale, sigmas, from_cfg, to_cfg, schedule): | |
guider = Guider_PerpNeg_scheduled(model, sigmas, from_cfg, to_cfg, schedule, neg_scale) | |
guider.set_conds(positive, negative, empty_conditioning) | |
return guider, sigmas | |
NODE_CLASS_MAPPINGS = { | |
"KSamplerProgress //Inspire": KSampler_progress, | |
"KSamplerAdvancedProgress //Inspire": KSamplerAdvanced_progress, | |
"ScheduledCFGGuider //Inspire": ScheduledCFGGuider, | |
"ScheduledPerpNegCFGGuider //Inspire": ScheduledPerpNegCFGGuider | |
} | |
NODE_DISPLAY_NAME_MAPPINGS = { | |
"KSamplerProgress //Inspire": "KSampler Progress (Inspire)", | |
"KSamplerAdvancedProgress //Inspire": "KSampler Advanced Progress (Inspire)", | |
"ScheduledCFGGuider //Inspire": "Scheduled CFGGuider (Inspire)", | |
"ScheduledPerpNegCFGGuider //Inspire": "Scheduled PerpNeg CFGGuider (Inspire)" | |
} | |