flatcherlee's picture
Upload 2334 files
3d5837a verified
raw
history blame
14.2 kB
import torch
from . import a1111_compat
import comfy
from .libs import common
from comfy import model_management
from comfy.samplers import CFGGuider
from comfy_extras.nodes_perpneg import Guider_PerpNeg
import math
class KSampler_progress(a1111_compat.KSampler_inspire):
@classmethod
def INPUT_TYPES(s):
return {"required": {
"model": ("MODEL",),
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}),
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, ),
"scheduler": (common.SCHEDULERS, ),
"positive": ("CONDITIONING", ),
"negative": ("CONDITIONING", ),
"latent_image": ("LATENT", ),
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}),
"noise_mode": (["GPU(=A1111)", "CPU"],),
"interval": ("INT", {"default": 1, "min": 1, "max": 10000}),
"omit_start_latent": ("BOOLEAN", {"default": True, "label_on": "True", "label_off": "False"}),
"omit_final_latent": ("BOOLEAN", {"default": False, "label_on": "True", "label_off": "False"}),
},
"optional": {
"scheduler_func_opt": ("SCHEDULER_FUNC",),
}
}
CATEGORY = "InspirePack/analysis"
RETURN_TYPES = ("LATENT", "LATENT")
RETURN_NAMES = ("latent", "progress_latent")
@staticmethod
def doit(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise, noise_mode,
interval, omit_start_latent, omit_final_latent, scheduler_func_opt=None):
adv_steps = int(steps / denoise)
if omit_start_latent:
result = []
else:
result = [latent_image['samples']]
def progress_callback(step, x0, x, total_steps):
if (total_steps-1) != step and step % interval != 0:
return
x = model.model.process_latent_out(x)
x = x.cpu()
result.append(x)
latent_image, noise = a1111_compat.KSamplerAdvanced_inspire.sample(model, True, seed, adv_steps, cfg, sampler_name, scheduler, positive, negative, latent_image, (adv_steps-steps),
adv_steps, noise_mode, False, callback=progress_callback, scheduler_func_opt=scheduler_func_opt)
if not omit_final_latent:
result.append(latent_image['samples'].cpu())
if len(result) > 0:
result = torch.cat(result)
result = {'samples': result}
else:
result = latent_image
return latent_image, result
class KSamplerAdvanced_progress(a1111_compat.KSamplerAdvanced_inspire):
@classmethod
def INPUT_TYPES(s):
return {"required": {
"model": ("MODEL",),
"add_noise": ("BOOLEAN", {"default": True, "label_on": "enable", "label_off": "disable"}),
"noise_seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.5, "round": 0.01}),
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, ),
"scheduler": (common.SCHEDULERS, ),
"positive": ("CONDITIONING", ),
"negative": ("CONDITIONING", ),
"latent_image": ("LATENT", ),
"start_at_step": ("INT", {"default": 0, "min": 0, "max": 10000}),
"end_at_step": ("INT", {"default": 10000, "min": 0, "max": 10000}),
"noise_mode": (["GPU(=A1111)", "CPU"],),
"return_with_leftover_noise": ("BOOLEAN", {"default": False, "label_on": "enable", "label_off": "disable"}),
"interval": ("INT", {"default": 1, "min": 1, "max": 10000}),
"omit_start_latent": ("BOOLEAN", {"default": False, "label_on": "True", "label_off": "False"}),
"omit_final_latent": ("BOOLEAN", {"default": False, "label_on": "True", "label_off": "False"}),
},
"optional": {
"prev_progress_latent_opt": ("LATENT",),
"scheduler_func_opt": ("SCHEDULER_FUNC",),
}
}
FUNCTION = "doit"
CATEGORY = "InspirePack/analysis"
RETURN_TYPES = ("LATENT", "LATENT")
RETURN_NAMES = ("latent", "progress_latent")
def doit(self, model, add_noise, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image,
start_at_step, end_at_step, noise_mode, return_with_leftover_noise, interval, omit_start_latent, omit_final_latent,
prev_progress_latent_opt=None, scheduler_func_opt=None):
if omit_start_latent:
result = []
else:
result = [latent_image['samples']]
def progress_callback(step, x0, x, total_steps):
if (total_steps-1) != step and step % interval != 0:
return
x = model.model.process_latent_out(x)
x = x.cpu()
result.append(x)
latent_image, noise = a1111_compat.KSamplerAdvanced_inspire.sample(model, add_noise, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, start_at_step, end_at_step,
noise_mode, False, callback=progress_callback, scheduler_func_opt=scheduler_func_opt)
if not omit_final_latent:
result.append(latent_image['samples'].cpu())
if len(result) > 0:
result = torch.cat(result)
result = {'samples': result}
else:
result = latent_image
if prev_progress_latent_opt is not None:
result['samples'] = torch.cat((prev_progress_latent_opt['samples'], result['samples']), dim=0)
return latent_image, result
def exponential_interpolation(from_cfg, to_cfg, i, steps):
if i == steps-1:
return to_cfg
if from_cfg == to_cfg:
return from_cfg
if from_cfg == 0:
return to_cfg * (1 - math.exp(-5 * i / steps)) / (1 - math.exp(-5))
elif to_cfg == 0:
return from_cfg * (math.exp(-5 * i / steps) - math.exp(-5)) / (1 - math.exp(-5))
else:
log_from = math.log(from_cfg)
log_to = math.log(to_cfg)
log_value = log_from + (log_to - log_from) * i / steps
return math.exp(log_value)
def logarithmic_interpolation(from_cfg, to_cfg, i, steps):
if i == 0:
return from_cfg
if i == steps-1:
return to_cfg
log_i = math.log(i + 1)
log_steps = math.log(steps + 1)
t = log_i / log_steps
return from_cfg + (to_cfg - from_cfg) * t
def cosine_interpolation(from_cfg, to_cfg, i, steps):
if (i == 0) or (i == steps-1):
return from_cfg
t = (1.0 + math.cos(math.pi*2*(i/steps))) / 2
return from_cfg + (to_cfg - from_cfg) * t
class Guider_scheduled(CFGGuider):
def __init__(self, model_patcher, sigmas, from_cfg, to_cfg, schedule):
super().__init__(model_patcher)
self.default_cfg = self.cfg
self.sigmas = sigmas
self.cfg_sigmas = None
self.cfg_sigmas_i = None
self.from_cfg = from_cfg
self.to_cfg = to_cfg
self.schedule = schedule
self.last_i = 0
self.renew_cfg_sigmas()
def set_cfg(self, cfg):
self.default_cfg = cfg
self.renew_cfg_sigmas()
def renew_cfg_sigmas(self):
self.cfg_sigmas = {}
self.cfg_sigmas_i = {}
i = 0
steps = len(self.sigmas) - 1
for x in self.sigmas:
k = float(x)
delta = self.to_cfg - self.from_cfg
if self.schedule == 'exp':
self.cfg_sigmas[k] = exponential_interpolation(self.from_cfg, self.to_cfg, i, steps), i
elif self.schedule == 'log':
self.cfg_sigmas[k] = logarithmic_interpolation(self.from_cfg, self.to_cfg, i, steps), i
elif self.schedule == 'cos':
self.cfg_sigmas[k] = cosine_interpolation(self.from_cfg, self.to_cfg, i, steps), i
else:
self.cfg_sigmas[k] = self.from_cfg + delta * i / steps, i
self.cfg_sigmas_i[i] = self.cfg_sigmas[k]
i += 1
def predict_noise(self, x, timestep, model_options={}, seed=None):
k = float(timestep[0])
v = self.cfg_sigmas.get(k)
if v is None:
# fallback
v = self.cfg_sigmas_i[self.last_i+1]
self.cfg_sigmas[k] = v
self.last_i = v[1]
self.cfg = v[0]
return super().predict_noise(x, timestep, model_options, seed)
class Guider_PerpNeg_scheduled(Guider_PerpNeg):
def __init__(self, model_patcher, sigmas, from_cfg, to_cfg, schedule, neg_scale):
super().__init__(model_patcher)
self.default_cfg = self.cfg
self.sigmas = sigmas
self.cfg_sigmas = None
self.cfg_sigmas_i = None
self.from_cfg = from_cfg
self.to_cfg = to_cfg
self.schedule = schedule
self.neg_scale = neg_scale
self.last_i = 0
self.renew_cfg_sigmas()
def set_cfg(self, cfg):
self.default_cfg = cfg
self.renew_cfg_sigmas()
def renew_cfg_sigmas(self):
self.cfg_sigmas = {}
self.cfg_sigmas_i = {}
i = 0
steps = len(self.sigmas) - 1
for x in self.sigmas:
k = float(x)
delta = self.to_cfg - self.from_cfg
if self.schedule == 'exp':
self.cfg_sigmas[k] = exponential_interpolation(self.from_cfg, self.to_cfg, i, steps), i
elif self.schedule == 'log':
self.cfg_sigmas[k] = logarithmic_interpolation(self.from_cfg, self.to_cfg, i, steps), i
elif self.schedule == 'cos':
self.cfg_sigmas[k] = cosine_interpolation(self.from_cfg, self.to_cfg, i, steps), i
else:
self.cfg_sigmas[k] = self.from_cfg + delta * i / steps, i
self.cfg_sigmas_i[i] = self.cfg_sigmas[k]
i += 1
def predict_noise(self, x, timestep, model_options={}, seed=None):
k = float(timestep[0])
v = self.cfg_sigmas.get(k)
if v is None:
# fallback
v = self.cfg_sigmas_i[self.last_i+1]
self.cfg_sigmas[k] = v
self.last_i = v[1]
self.cfg = v[0]
return super().predict_noise(x, timestep, model_options, seed)
class ScheduledCFGGuider:
@classmethod
def INPUT_TYPES(s):
return {"required": {
"model": ("MODEL", ),
"positive": ("CONDITIONING", ),
"negative": ("CONDITIONING", ),
"sigmas": ("SIGMAS", ),
"from_cfg": ("FLOAT", {"default": 6.5, "min": 0.0, "max": 100.0, "step": 0.1, "round": 0.01}),
"to_cfg": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 100.0, "step": 0.1, "round": 0.01}),
"schedule": (["linear", "log", "exp", "cos"], {'default': 'log'})
}
}
RETURN_TYPES = ("GUIDER", "SIGMAS")
FUNCTION = "get_guider"
CATEGORY = "sampling/custom_sampling/guiders"
def get_guider(self, model, positive, negative, sigmas, from_cfg, to_cfg, schedule):
guider = Guider_scheduled(model, sigmas, from_cfg, to_cfg, schedule)
guider.set_conds(positive, negative)
return guider, sigmas
class ScheduledPerpNegCFGGuider:
@classmethod
def INPUT_TYPES(s):
return {"required": {
"model": ("MODEL", ),
"positive": ("CONDITIONING", ),
"negative": ("CONDITIONING", ),
"empty_conditioning": ("CONDITIONING", ),
"neg_scale": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 100.0, "step": 0.01}),
"sigmas": ("SIGMAS", ),
"from_cfg": ("FLOAT", {"default": 6.5, "min": 0.0, "max": 100.0, "step": 0.1, "round": 0.01}),
"to_cfg": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 100.0, "step": 0.1, "round": 0.01}),
"schedule": (["linear", "log", "exp", "cos"], {'default': 'log'})
}
}
RETURN_TYPES = ("GUIDER", "SIGMAS")
FUNCTION = "get_guider"
CATEGORY = "sampling/custom_sampling/guiders"
def get_guider(self, model, positive, negative, empty_conditioning, neg_scale, sigmas, from_cfg, to_cfg, schedule):
guider = Guider_PerpNeg_scheduled(model, sigmas, from_cfg, to_cfg, schedule, neg_scale)
guider.set_conds(positive, negative, empty_conditioning)
return guider, sigmas
NODE_CLASS_MAPPINGS = {
"KSamplerProgress //Inspire": KSampler_progress,
"KSamplerAdvancedProgress //Inspire": KSamplerAdvanced_progress,
"ScheduledCFGGuider //Inspire": ScheduledCFGGuider,
"ScheduledPerpNegCFGGuider //Inspire": ScheduledPerpNegCFGGuider
}
NODE_DISPLAY_NAME_MAPPINGS = {
"KSamplerProgress //Inspire": "KSampler Progress (Inspire)",
"KSamplerAdvancedProgress //Inspire": "KSampler Advanced Progress (Inspire)",
"ScheduledCFGGuider //Inspire": "Scheduled CFGGuider (Inspire)",
"ScheduledPerpNegCFGGuider //Inspire": "Scheduled PerpNeg CFGGuider (Inspire)"
}