import requests import random from PIL import Image, ImageOps, ImageDraw, ImageFont from datetime import datetime from pathlib import Path import io, re, base64, os import zipfile import math BASE_URL="https://image.novelai.net" def process_xargs(xargs): processed_xargs = [] skip_until = None # 이 변수는 현재 처리 중인 '<...>' 구문의 끝 인덱스를 추적합니다. for i, elem in enumerate(xargs): if skip_until is not None and i <= skip_until: continue # 이미 처리된 범위를 건너뛰기 if "add_negative:<" in elem or "rem_negative:<" in elem: # '<'로 시작하여 '>'로 끝나는 구문 처리 if elem.endswith(">"): processed_xargs.append(elem) else: combined_elem = elem for j in range(i + 1, len(xargs)): combined_elem += ',' + xargs[j] # ','를 포함하여 원소들을 결합 if xargs[j].endswith(">"): skip_until = j # '>'로 끝나는 원소를 찾으면, 처리할 범위의 끝 인덱스를 업데이트 break processed_xargs.append(combined_elem) else: processed_xargs.append(elem) # 일반적인 원소는 그대로 리스트에 추가 return processed_xargs def event_cond_negative(_prompt, negative, user_input, rating): def insert_text_after_keyword(negative, user_keyword, user_additional_keyword): if user_keyword in negative: index = negative.index(user_keyword) + 1 negative.insert(index, user_additional_keyword) return negative def parse_conditional_command(command): match = re.match(r"\((.*?)\)\:(.*)", command) if match: return match.groups() return '', command def check_condition(prompt, condition, rating): if not condition: return True sub_conditions = re.split(r'\)\s*&\s*\(', condition) sub_conditions = [re.sub(r'^\(|\)$', '', cond) for cond in sub_conditions] results = [] for sub_cond in sub_conditions: if '&' in sub_cond: results.append(all(check_condition(prompt, cond, rating) for cond in sub_cond.split('&'))) elif '|' in sub_cond: results.append(any(check_condition(prompt, cond, rating) for cond in sub_cond.split('|'))) else: if sub_cond in ['e', 'q', 's', 'g']: results.append(sub_cond == rating) elif sub_cond in ['~e', '~q', '~s', '~g']: results.append(sub_cond != rating) # PM elif sub_cond.startswith('*'): results.append(sub_cond[1:] in prompt) # NOT IN elif sub_cond.startswith('~!'): results.append(sub_cond[2:] not in prompt) elif sub_cond.startswith('~'): results.append(any(sub_cond[1:] not in element for element in prompt)) # CONTAIN else: results.append(any(sub_cond in element for element in prompt)) return all(results) def execute_command(negative, command): if '+=' in command: keyword, addition = command.split('+=', 1) addition = addition.replace('^', ', ') return insert_text_after_keyword(negative, keyword, addition) elif command.startswith('add '): keyword = command[4:] keyword = keyword.replace('^', ', ') keys = keyword.split(',') keys = [key.strip() for key in keys] for key in keys: if key not in negative: negative.append(key) return negative elif command.startswith('rem '): keyword = command[4:] keyword = keyword.replace('^', ', ') keys = keyword.split(',') keys = [key.strip() for key in keys] for key in keys: if key in negative: negative.remove(key) return negative elif '=' in command: keyword, replacement = command.split('=', 1) if keyword in negative: replacement = replacement.replace('^', ', ') index = negative.index(keyword) negative[index] = replacement return negative negative = negative.split(',') negative = [neg.strip() for neg in negative] prompt = _prompt.split(',') prompt = [key.strip() for key in prompt] commands = [cmd.strip() for cmd in user_input.split(',') if not cmd.strip().startswith('#')] for command in commands: condition, cmd = parse_conditional_command(command) if check_condition(prompt, condition, rating): negative = execute_command(negative, cmd) return ', '.join(negative) def image_to_base64(image): image_bytesIO = io.BytesIO() image.save(image_bytesIO, format="PNG") return base64.b64encode(image_bytesIO.getvalue()).decode() def make_turbo_prompt(gen_request): lines = gen_request['prompt'] result = { "boys": False, "girls": False, "1girl": False, "1boy": False, "1other": False, "others": False } state = { "nude,": False, "pov,": False, "cum,": False, "after ": False, "pussy juice": False, "barefoot": False, "breasts": False, "ejaculation": False, } def insert_spaces(source_list, reference_list): modified_list = source_list.copy() for index, keyword in enumerate(reference_list): if keyword not in source_list: space_count = len(keyword) # 키워드 길이만큼의 공백 문자 modified_list.insert(index, ' ' * space_count) return modified_list keywords = gen_request['prompt'].split(', ') filtered_keywords = [] removed_indices = [] positive0, positive1, positive2, positive3 = gen_request.copy(),gen_request.copy(),gen_request.copy(),gen_request.copy() for word in result.keys(): if word in lines: result[word] = True for word in state.keys(): if word in gen_request['prompt']: state[word] = True key_index = int((len(keywords)/2)-1) if(result["1boy"]) or (result["boys"]): if(result["1girl"]): if(', sex' in gen_request['prompt'] or 'group sex' in gen_request['prompt']): sex_pos_keywords = ['stomach bulge','insertion', 'fucked silly', 'x-ray', 'orgasm', 'cross-section', 'uterus', 'overflow', 'rape', 'vaginal', 'anal'] facial_keywords = ['tongue','ahegao'] temp_sex_pos = [] temp_facial = [] cum_events = [] explicit_check = [] if 'open mouth' in keywords: keywords.remove('open mouth') if 'closed mouth' in keywords: keywords.remove('closed mouth') if 'after rape' in keywords: keywords.remove('after rape') explicit_check.append('after rape') if 'used condom' in keywords: keywords.remove('used condom') explicit_check.append('used condom') for keyword in keywords: if ('sex' not in keyword and 'cum' not in keyword and 'ejaculation' not in keyword and 'vaginal' not in keyword and 'penetration' not in keyword) and all(sex_pos not in keyword for sex_pos in sex_pos_keywords) and all(facial not in keyword for facial in facial_keywords): filtered_keywords.append(keyword) elif 'sex' in keyword: removed_indices.append(keyword) elif 'penetration' in keyword: removed_indices.append(keyword) elif 'cum' in keyword and keyword != 'cum': cum_events.append(keyword) elif any(sex_pos in keyword for sex_pos in sex_pos_keywords): for sex_pos in sex_pos_keywords: if sex_pos in keyword: temp_sex_pos.append(sex_pos) elif any(facial not in keyword for facial in facial_keywords): for facial in facial_keywords: if facial in keyword: temp_facial.append(facial) filtered_keywords.insert(int((len(filtered_keywords)/2)-1), ' no penetration, imminent penetration') filtered_keywords_positive0 = filtered_keywords.copy() filtered_keywords.remove(' no penetration, imminent penetration') #0 imminent penetration, imminent sex if 'condom' in filtered_keywords and 'condom on penis' not in filtered_keywords: t_index = filtered_keywords.index('condom') rand_num = random.randint(0,2) if rand_num == 1: filtered_keywords.insert(t_index, 'condom on penis') for i, keyword in enumerate(filtered_keywords): if 'pantyhose' in keyword: filtered_keywords[i] = 'torn ' + filtered_keywords[i] #1 default key_index = int((len(filtered_keywords)/2)-1) if 'pussy' in filtered_keywords: key_index = filtered_keywords.index('pussy') if 'penis' in filtered_keywords: key_index = filtered_keywords.index('penis') filtered_keywords[key_index:key_index] = ['motion lines', 'surprised'] for keyword in removed_indices: if 'cum' not in keyword and 'ejaculation' not in keyword: filtered_keywords.insert(key_index,keyword) if(temp_sex_pos): filtered_keywords[key_index:key_index] = temp_sex_pos if 'group sex' in filtered_keywords and 'sex' not in filtered_keywords: t_index = filtered_keywords.index('group sex') filtered_keywords.insert(t_index, 'sex') if('clothed sex' in filtered_keywords and not 'bottomless' in filtered_keywords): filtered_keywords.insert(filtered_keywords.index('clothed sex')+1, 'bottomless') pos1_copied_keywords = filtered_keywords.copy() for i, keyword in enumerate(pos1_copied_keywords): if 'closed eyes' in keyword: rand_num = random.randint(0,2) if(rand_num == 0): pos1_copied_keywords[i] = 'half-' + pos1_copied_keywords[i] elif(rand_num == 1 and 'closed eyes' in pos1_copied_keywords): pos1_copied_keywords.remove('closed eyes') filtered_keywords[i] = 'half-closed eyes' filtered_keywords_positive1 = pos1_copied_keywords.copy() #2 ejaculation,cum in pussy key_index = filtered_keywords.index('surprised') filtered_keywords.remove('surprised') filtered_keywords[key_index:key_index] = ["ejaculation","cum"] if "condom on penis" not in filtered_keywords else ["twitching penis", "[[[[orgasm]]]]"] for keyword in removed_indices: if 'cum' in keyword: filtered_keywords.insert(key_index,keyword) if(temp_facial): filtered_keywords[key_index:key_index] =temp_facial filtered_keywords_positive2 = filtered_keywords.copy() #3 after sex, after ejaculation for i, keyword in enumerate(filtered_keywords): if 'closed eyes' in keyword: rand_num = random.randint(0,2) if(rand_num == 0 and filtered_keywords[i] != 'half-closed eyes'): filtered_keywords[i] = 'half-' + filtered_keywords[i] elif(rand_num == 1): filtered_keywords[i] = 'empty eyes' else: filtered_keywords[i] = 'empty eyes, half-closed eyes' if 'sex' in filtered_keywords: key_index = filtered_keywords.index('sex') elif 'group sex' in filtered_keywords: key_index = filtered_keywords.index('group sex') if "condom on penis" not in filtered_keywords: filtered_keywords.remove('ejaculation') else: filtered_keywords.remove('twitching penis') filtered_keywords.remove('[[[[orgasm]]]]') filtered_keywords[key_index:key_index] = ['cum drip', 'erection'] + cum_events if "condom on penis" not in filtered_keywords else ["used condom", "{{used condom on penis}}"] if(explicit_check): filtered_keywords[key_index:key_index] = explicit_check if 'sex' in filtered_keywords and 'group sex' not in filtered_keywords: if('pussy' in filtered_keywords and not 'anal' in filtered_keywords): if "condom on penis" not in filtered_keywords: filtered_keywords.insert(filtered_keywords.index('sex')+1, 'after vaginal, spread pussy') else: filtered_keywords.insert(filtered_keywords.index('sex')+1, 'after vaginal, spread pussy, pussy juice puddle') elif('anal' in filtered_keywords): if "condom on penis" not in filtered_keywords: filtered_keywords.insert(filtered_keywords.index('sex')+1, 'after anal, cum in ass') else: filtered_keywords.insert(filtered_keywords.index('sex')+1, 'after anal, pussy juice') filtered_keywords.insert(filtered_keywords.index('sex'), 'after sex') filtered_keywords.remove('sex') elif 'group sex' in filtered_keywords: if('vaginal' in filtered_keywords and not 'anal' in filtered_keywords): filtered_keywords.insert(filtered_keywords.index('group sex')+1, 'after vaginal, spread pussy') if 'multiple penises' in filtered_keywords: if "condom on penis" not in filtered_keywords: filtered_keywords.insert(filtered_keywords.index('group sex')+3, 'cum on body, bukkake') else: filtered_keywords.insert(filtered_keywords.index('group sex')+3, 'pussy juice, pussy juice puddle') elif('anal' in filtered_keywords): if "condom on penis" not in filtered_keywords: filtered_keywords.insert(filtered_keywords.index('group sex')+1, 'after anus, cum in ass') else: filtered_keywords.insert(filtered_keywords.index('group sex')+1, 'after anus') if 'multiple penises' in filtered_keywords: if "condom on penis" not in filtered_keywords: filtered_keywords.insert(filtered_keywords.index('group sex')+3, 'cum on body, bukkake') else: filtered_keywords.insert(filtered_keywords.index('group sex')+3, 'pussy juice') else: filtered_keywords.insert(filtered_keywords.index('group sex')+1, 'cum on body, {bukkake}') temp_post_keyword = [] for keyword in sex_pos_keywords: if not (keyword == 'orgasm' or keyword == 'overflow'): if keyword in filtered_keywords: temp_post_keyword.append(keyword) for keyword in temp_post_keyword: filtered_keywords.remove(keyword) positive0['prompt'] = ', '.join(insert_spaces(filtered_keywords_positive0, filtered_keywords)).strip() positive1['prompt'] = ', '.join(insert_spaces(filtered_keywords_positive1, filtered_keywords)).strip() positive2['prompt'] = ', '.join(insert_spaces(filtered_keywords_positive2, filtered_keywords)).strip() positive3['prompt'] = ', '.join(filtered_keywords).strip() positive0["type"] = "turbo" positive1["type"] = "turbo" positive2["type"] = "turbo" positive3["type"] = "turbo" return positive0, positive1, positive2, positive3 def generate_image(access_token, prompt, model, action, parameters): if re.match(r'^http[s]?://', access_token): return generate_image_webui(access_token, prompt, model, action, parameters) return generate_image_NAI(access_token, prompt, model, action, parameters) def generate_image_NAI(access_token, prompt, model, action, parameters): data = { "input": prompt, "model": model, "action": action, "parameters": parameters, } if data['parameters']['qualityToggle'] == True: if "nai-diffusion-furry-3" not in data['model']: data['input'] += ', best quality, amazing quality, very aesthetic, absurdres' else: data['input'] += ', {best quality}, {amazing quality}' if data['parameters']['sampler'] not in ["k_euler", "k_euler_ancestral", "k_dpmpp_sde", "k_dpmpp_2s_ancestral", "k_dpmpp_2m", "k_dpmpp_2m_sde"]: data['parameters']['sampler'] = "k_euler_ancestral" if data['parameters']['cfg_rescale'] > 1: data['parameters']['cfg_rescale'] = 0 response = requests.post(f"{BASE_URL}/ai/generate-image", json=data, headers={ "Authorization": f"Bearer {access_token}" }) # catch any errors return response.content def augment_image_NAI(gen_request): def resize_and_fill(image, max_size=None): if max_size is None: max_size = gen_request["user_screen_size"] original_width, original_height = image.size if original_width > max_size or original_height > max_size: # 비율을 유지하면서 크기 조정 image.thumbnail((max_size, max_size)) # 새 이미지 크기 계산 width, height = image.size new_image = Image.new("RGB", (max_size, max_size), "black") new_image.paste(image, ((max_size - width) // 2, (max_size - height) // 2)) return new_image else: return image def log_error(e, output_file_path="output_file_path"): # 현재 시간을 얻습니다 current_time = datetime.now().strftime("%m/%d %H:%M:%S") # 에러 로그 메시지 error_message = f"#### Error occured at {current_time} ####\nError: {e}\n############################################\n" # 지정된 출력 폴더의 error_log.txt 파일에 쓰기 with open(f"error_log.txt", "a") as file: file.write(error_message) access_token = gen_request["access_token"] mode = gen_request["mode"] defry = gen_request["defry"] prompt = gen_request["prompt"] iw = gen_request["width"] ih = gen_request["height"] image = gen_request["image"] if mode in ["declutter", "lineart", "sketch", "colorize"]: _mode = mode else: _mode = "emotion" mode = mode.lower() data = { "req_type": _mode, "width": iw, "height": ih, "image" : image_to_base64(image) } dfval = { "Normal" : 0, "Slightly Weak" : 1, "Weak" : 2, "Even Weaker" : 3, "Very Weak" : 4, "Weakest" : 5 } if _mode == "emotion": try: data["prompt"] = mode+";;"+prompt+";" except: data["prompt"] = mode+";" try: data["defry"] = dfval[defry] except: data["defry"] = 0 prompt = data["prompt"] elif _mode == "colorize": data["prompt"] = prompt try: data["defry"] = dfval[defry] except: data["defry"] = 0 else: prompt = "" aug_response = requests.post(f"{BASE_URL}/ai/augment-image", json=data, headers={ "Authorization": f"Bearer {access_token}" }) save_folder = gen_request["save_folder"] additional_folder = '' if gen_request["png_rule"] == "count": additional_folder = "/" + gen_request["start_time"] if "additional_save_folder" in gen_request: if gen_request["additional_save_folder"]["command1"] != "": additional_folder += "/" + gen_request["additional_save_folder"]["command1"].replace('/',"_") if gen_request["additional_save_folder"]["command2"] != "": additional_folder += "/" + gen_request["additional_save_folder"]["command2"].replace('/',"_") forbidden_chars = r'[\:*?"<>|]' additional_folder = re.sub(forbidden_chars, '_', additional_folder) additional_folder += "/director" d = Path(save_folder + additional_folder) d.mkdir(parents=True, exist_ok=True) try: zipped = zipfile.ZipFile(io.BytesIO(aug_response.content)) result_list = [] for idx, file_info in enumerate(zipped.infolist()): image_bytes = zipped.read(file_info) if gen_request["png_rule"] == "count": _count = gen_request["count"] if "batch_size" in gen_request: filename = (d / f"{_count:05}_{idx}.png" ) else: filename = (d / f"{_count:05}.png" ) else: if "batch_size" in gen_request: filename = (d / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{idx}.png" ) else: filename = (d / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" ) filename.write_bytes(image_bytes) i = Image.open(io.BytesIO(image_bytes)) i = ImageOps.exif_transpose(i).convert("RGB") if "artist" not in gen_request: i_resized = resize_and_fill(i) next = i_resized, prompt, 0, i.info, str(filename) result_list.append(next) return i_resized, prompt, 0, i.info, str(filename) if len(result_list) ==1 else result_list except Exception as e: try: if aug_response.content is None: raise ValueError("Connection broken (Protocol Error)") error_message = aug_response.decode('utf-8')[2:-2] except Exception as inner_exception: error_message = str(inner_exception) log_error(error_message, "path_to_output_folder") return None, error_message, 0, None, None def upscale_NAI(access_token, image, parameters): img_str = image_to_base64(image) _width, _height = image.size data = { "image": img_str, "width": _width, "height": _height, "scale": 4, } response = requests.post(f"https://api.novelai.net/ai/upscale", json=data, headers={ "Authorization": f"Bearer {access_token}" }) return response.content def convert_prompt(prompt): keywords = [keyword.strip() for keyword in prompt.split(',')] converted_keywords = [] for keyword in keywords: #if 'artist:' in keyword: # keyword = keyword.replace('artist:', '') #if '(' in keyword: # keyword = keyword.replace('(', '\(').replace(')', '\)') if '{' in keyword: keyword = keyword.replace('{', '(').replace('}', ')') converted_keywords.append(keyword) return ', '.join(converted_keywords) def convert_prompt_ad(prompt, data): keywords = [keyword.strip() for keyword in prompt.split(',')] converted_keywords = [] closed_eyes_check = False if "closed eyes" not in keywords else True for idx, keyword in enumerate(keywords): if idx > 4 and (keyword in data.bag_of_tags): if ("eyes" in keyword or "pupils" in keyword) and closed_eyes_check: continue keywords[idx] = "((" + keyword + "))" if 'artist:' in keyword: keyword = keyword.replace('artist:', '').replace('[','').replace(']','') keyword = keyword if '(' in keyword: keyword = keyword.replace('(', '\(').replace(')', '\)') if '{' in keyword: keyword = keyword.replace('{', '(').replace('}', ')') if '}' in keyword: keyword = keyword.replace('}', ')') converted_keywords.append(keyword) return ', '.join(converted_keywords) def interrogate_webui(img, access_token): img_str = image_to_base64(img) req_img = { "image": img_str, "threshold": 0.35, "model": "wd-v1-4-moat-tagger.v2", "queue": "" } try: res = requests.post(f"{access_token}/tagger/v1/interrogate", json=req_img) if res.status_code != 200: raise Exception(f"Error code: {res.status_code}") text = res.json().get('caption', {}).get('tag', {}) text_list = [txt.replace("_", " ") for txt in text.keys()] _rating = res.json().get('caption', {}).get('rating', {}) rating = max(_rating, key=_rating.get) if rating == "sensitive": rv = "s" elif rating == "questionable": rv = "q" elif rating == "explicit": rv = "e" else: rv = "g" return ", ".join(text_list), rv except Exception as e: return f"{e} : WEBUI의 Extension에 stable-diffusion-webui-wd14-tagger 가 정상적으로 설치되어있는지 확인하세요. (WD14 기준버전 : e72d984b, 2023-11-04)", None def generate_image_webui(access_token, prompt, model, action, parameters): samplers = { "k_euler": "Euler", "k_euler_ancestral": "Euler a", "k_dpmpp_2s_ancestral": "DPM++ 2S a", "k_dpmpp_sde": "DPM++ SDE", "k_dpm_3m_sde": "DPM++ 3M SDE" } # matching data format data = { "input": prompt, "model": model, "action": action, "parameters": parameters, } if data['parameters']['sampler'] in samplers: data['parameters']['sampler'] = samplers[data['parameters']['sampler']] params = { "prompt": convert_prompt(data['input']) if "nai_enable_AD" not in parameters else convert_prompt_ad(data['input'], parameters["ad_data"]), "negative_prompt": convert_prompt(data['parameters']['negative_prompt']) if "nai_enable_AD" not in parameters else convert_prompt_ad(data['parameters']['negative_prompt'], parameters["ad_data"]), "steps": math.floor(data['parameters']['cfg_rescale'] / 0.5) * 0.5, "width": data['parameters']['width'], "height": data['parameters']['height'], "cfg_scale": data['parameters']['scale'], "sampler_index": data['parameters']['sampler'], "seed": data['parameters']['seed'], "seed_resize_from_h": -1, "seed_resize_from_w": -1, "denoising_strength": None, "n_iter": "1", "batch_size": data['parameters']['n_samples'] } if 'scheduler' in parameters: params["scheduler"] = data['parameters']['scheduler'] if data['parameters']['enable_hr'] == True: params['enable_hr'] = True params["hr_upscaler"] = data['parameters']["hr_upscaler"] params["hr_scale"] = data['parameters']["hr_scale"] params["hr_second_pass_steps"] = data['parameters']["hr_second_pass_steps"] params["denoising_strength"] = data['parameters']["denoising_strength"] if data['parameters']['enable_AD'] == True: params["alwayson_scripts"] = {"ADetailer": { "args": [ True, False if "nai_enable_AD" not in parameters else True, { "ad_model": "face_yolov8n.pt", "ad_prompt": params["prompt"], "ad_negative_prompt": params["negative_prompt"], "ad_denoising_strength": 0.4 if "nai_enable_AD" not in parameters else parameters["ad_data_str"], "ad_inpaint_only_masked": True, "ad_inpaint_only_masked_padding": 32, "ad_sampler" : data['parameters']['sampler'] #"ad_scheduler": "Automatic" } ] } } if "nai_enable_AD" in parameters: params["steps"] = 28 else: params["alwayson_scripts"] = {} if 'enable_TV' in data['parameters'] and data['parameters']['enable_TV'] == True: params["alwayson_scripts"]["Tiled VAE"] = {} params["alwayson_scripts"]["Tiled VAE"]["args"] = data['parameters']["tiled_vae_args"] if 'pag' in parameters: params["alwayson_scripts"]["Incantations"] = {} pag_pre = math.floor(float(data['parameters']['pag']) / 0.5) * 0.5 params["alwayson_scripts"]["Incantations"]["args"] = [True,pag_pre, 0, 150, False, "Constant", 0, 100, False,False,2,0.1,0.5,0,"",0,25, 1, False,False,False,"BREAK","-",0.2,10] if "image" in data['parameters']: params['init_images'] = [data['parameters']['image']] params['include_init_images'] = True if "strength" in data['parameters']: params['denoising_strength'] = data['parameters']['strength'] if "mask" in data['parameters']: params['mask'] = data['parameters']['mask'] params['inpainting_fill'] = 1 params['inpaint_full_res'] = data['parameters']['add_original_image'] params['inpaint_full_res_padding'] = 32 if 'denoising_strength' not in params: params['denoising_strength'] = 0.7 res = requests.post(f"{access_token}/sdapi/v1/img2img", json=params) else: res = requests.post(f"{access_token}/sdapi/v1/txt2img", json=params) imageb64s = res.json()['images'] content = None for b64 in imageb64s: img = b64.encode() content = base64.b64decode(img) s = io.BytesIO() zf = zipfile.ZipFile(s, "w") for idx, b64 in enumerate(imageb64s): content = base64.b64decode(b64) zf.writestr(f"generated_{idx}.png", content) zf.close() return s.getvalue() def generate_guide_image_webui(access_token, parameters): samplers = { "k_euler": "Euler", "k_euler_ancestral": "Euler a", "k_dpmpp_2s_ancestral": "DPM++ 2S a", "k_dpmpp_sde": "DPM++ SDE", "k_dpm_3m_sde": "DPM++ 3M SDE" } params = { "prompt": convert_prompt(parameters['prompt']), "negative_prompt": convert_prompt(parameters['negative prompt']), "steps": parameters['steps'], "width": parameters["width"], "height": parameters["height"], "cfg_scale": parameters['cfg_scale'], "sampler_index": samplers[parameters['sampler']] if parameters['sampler'] in samplers else parameters['sampler'], "seed": parameters['seed'], "seed_resize_from_h": -1, "seed_resize_from_w": -1, "denoising_strength": None, "n_iter": "1", "batch_size": 1 } additional_folder = "/guide" if "start_time" in parameters and parameters["png_rule"] == "count": additional_folder = "/" + parameters["start_time"] + "/guide" if "save_folder" in parameters: save_folder = parameters["save_folder"] try: res = requests.post(f"{access_token}/sdapi/v1/txt2img", json=params) imageb64s = res.json()['images'] image_data = base64.b64decode(imageb64s[0]) image_file = io.BytesIO(image_data) i = Image.open(image_file) if "save_folder" in parameters: s = io.BytesIO() zf = zipfile.ZipFile(s, "w") for idx, b64 in enumerate(imageb64s): img_data = base64.b64decode(b64) img = Image.open(io.BytesIO(img_data)) jpeg_buffer = io.BytesIO() try: img.save(jpeg_buffer, format="JPEG", exif=img.info.get('parameters').encode('utf-8')) except: img.save(jpeg_buffer, format="JPEG") jpeg_content = jpeg_buffer.getvalue() zf.writestr(f"generated_{idx}.jpg", jpeg_content) zf.close() d = Path(save_folder + additional_folder) d.mkdir(parents=True, exist_ok=True) zipped = zipfile.ZipFile(io.BytesIO(s.getvalue())) result_list = [] for idx, file_info in enumerate(zipped.infolist()): image_bytes = zipped.read(file_info) filename = (d / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" ) filename.write_bytes(image_bytes) except: i = Image.new('RGB', (768, 768), 'white') return i def generate_typer_image_webui(access_token, parameters): params = { "prompt": convert_prompt(parameters['prompt']), "negative_prompt": convert_prompt(parameters['negative prompt']), "steps": parameters['steps'], "width": parameters["width"], "height": parameters["height"], "cfg_scale": parameters['cfg_scale'], "sampler_index": parameters['sampler'], "seed": parameters['seed'], "seed_resize_from_h": -1, "seed_resize_from_w": -1, "denoising_strength": None, "n_iter": "1", "batch_size": 1 } if "scheduler" in parameters: params["scheduler"] = parameters["scheduler"] try: res = requests.post(f"{access_token}/sdapi/v1/txt2img", json=params) imageb64s = res.json()['images'] image_data = base64.b64decode(imageb64s[0]) image_file = io.BytesIO(image_data) i = Image.open(image_file) except: i = Image.new('RGB', (768, 768), 'white') return i def generate_dtg(access_token, parameters): params = { "prompt": parameters['prompt'], "negative_prompt": parameters['negative prompt'], "steps": 1, "width": 64, "height": 64, "cfg_scale": 1, "sampler_index": "Euler a", "seed": 1, "seed_resize_from_h": -1, "seed_resize_from_w": -1, "denoising_strength": None, "n_iter": "1", "batch_size": 1, } if parameters["format"] == "Animagine": format = "<|special|>, <|characters|>, <|copyrights|>, <|artist|>, <|general|>, <|quality|>, <|meta|>, <|rating|>" elif parameters["format"] == "Pony": format = "<|quality|>, <|special|>, <|characters|>, <|copyrights|>, <|artist|>, <|general|>, <|meta|>, <|rating|>" params["alwayson_scripts"] = {} params["alwayson_scripts"]["DanTagGen"] = { "args" : [ True, "After applying other prompt processings", -1, #random.randint(0, 4294967295 - params["seed"]) parameters["length"], params["negative_prompt"], format, parameters["temp"], 0.95, 100, "KBlueLeaf/DanTagGen-delta-rev2", False, False ] } try: res = requests.post(f"{access_token}/sdapi/v1/txt2img", json=params) imageb64s = res.json()['images'] image_data = base64.b64decode(imageb64s[0]) image_file = io.BytesIO(image_data) i = Image.open(image_file) except: return "" return i.info def generate(gen_request, _naia): def parse_and_execute_commands(_prompt, negative, user_input, rating): negative = negative.split(',') negative = [neg.strip() for neg in negative] prompt = _prompt.split(',') prompt = [key.strip() for key in prompt] commands = [cmd.strip() for cmd in user_input.split(',') if not cmd.strip().startswith('#')] for command in commands: condition, cmd = parse_conditional_command(command) if check_condition(prompt, condition, rating): negative = execute_command(negative, cmd) return ', '.join(negative) def parse_conditional_command(command): match = re.match(r"\((.*?)\)\:(.*)", command) if match: return match.groups() return '', command def check_condition(prompt, condition, rating): if not condition: return True sub_conditions = re.split(r'\)\s*&\s*\(', condition) sub_conditions = [re.sub(r'^\(|\)$', '', cond) for cond in sub_conditions] results = [] for sub_cond in sub_conditions: if '&' in sub_cond: results.append(all(check_condition(prompt, cond, rating) for cond in sub_cond.split('&'))) elif '|' in sub_cond: results.append(any(check_condition(prompt, cond, rating) for cond in sub_cond.split('|'))) else: if sub_cond in ['e', 'q', 's', 'g']: results.append(sub_cond == rating) elif sub_cond in ['~e', '~q', '~s', '~g']: results.append(sub_cond != rating) # PM elif sub_cond.startswith('*'): results.append(sub_cond[1:] in prompt) # NOT IN elif sub_cond.startswith('~!'): results.append(sub_cond[2:] not in prompt) elif sub_cond.startswith('~'): results.append(any(sub_cond[1:] not in element for element in prompt)) # CONTAIN else: results.append(any(sub_cond in element for element in prompt)) return all(results) def execute_command(negative, command): if '+=' in command: keyword, addition = command.split('+=', 1) addition = addition.replace('^', ', ') return insert_text_after_keyword(negative, keyword, addition) elif command.startswith('add '): keyword = command[4:] keyword = keyword.replace('^', ', ') keys = keyword.split(',') keys = [key.strip() for key in keys] for key in keys: if key not in negative: negative.append(key) return negative elif command.startswith('rem '): keyword = command[4:] keyword = keyword.replace('^', ', ') keys = keyword.split(',') keys = [key.strip() for key in keys] for key in keys: if key in negative: negative.remove(key) return negative elif '=' in command: keyword, replacement = command.split('=', 1) if keyword in negative: replacement = replacement.replace('^', ', ') index = negative.index(keyword) negative[index] = replacement return negative def insert_text_after_keyword(negative, user_keyword, user_additional_keyword): if user_keyword in negative: index = negative.index(user_keyword) + 1 negative.insert(index, user_additional_keyword) return negative def open_random_png(folderpath): files = os.listdir(folderpath) png_files = [f for f in files if f.endswith('.png')] if not png_files: return None random_png = random.choice(png_files) img = Image.open(os.path.join(folderpath, random_png)) return img try: seed = int(gen_request["seed"]) except: seed = random.randint(0,9999999999) try: width = int(gen_request["width"]) height = int(gen_request["height"]) except: width, height= 1024, 1024 params = { "legacy": False, "qualityToggle": True if gen_request["quality_toggle"] == 1 else False, "width": width, "height": height, "n_samples": 1 if "batch_size" not in gen_request else gen_request["batch_size"], "seed": seed, "extra_noise_seed": random.randint(0,9999999999), "sampler": gen_request["sampler"], "steps": 28 if "steps" not in gen_request and gen_request["type"]!="upper" else gen_request["steps"], "scale": gen_request["scale"], "negative_prompt": ', '.join([keyword.strip() for keyword in gen_request["negative"].split(',') if not keyword.strip().startswith('#')]), "sm" : True if gen_request["sema"] == 1 else False, "sm_dyn" : True if gen_request["sema_dyn"] == 1 else False, "dynamic_thresholding": True if ("dynamic_thresholding" in gen_request and gen_request["dynamic_thresholding"] == True) else False, "controlnet_strength": 1.0, "add_original_image": False, "cfg_rescale": gen_request["cfg_rescale"], "noise_schedule": gen_request["noise_schedule"], "enable_hr" : gen_request["enable_hr"], "enable_AD" : gen_request["enable_AD"] } if "skip_cfg_above_sigma" in gen_request and gen_request["skip_cfg_above_sigma"] == True: if float(gen_request["uncond_scale"]) == 19.19: params["skip_cfg_above_sigma"] = 19.191344202730882 else: try: _scale = float(gen_request["uncond_scale"]) params["skip_cfg_above_sigma"] = _scale except: params["skip_cfg_above_sigma"] = 19.191344202730882 if params["enable_hr"] == True: params["hr_upscaler"] = gen_request["hr_upscaler"] params["hr_scale"] = gen_request["hr_scale"] params["hr_second_pass_steps"] = gen_request["hr_second_pass_steps"] params["denoising_strength"] = gen_request["denoising_strength"] if "enable_TV" in gen_request and gen_request["enable_TV"] == True: params["enable_TV"] = True params["tiled_vae_args"] = gen_request["tiled_vae_args"] if "reference_image" in gen_request: params["reference_image_multiple"] = gen_request["reference_image"] if isinstance(gen_request["reference_image"], list) else [gen_request["reference_image"]] params["reference_information_extracted_multiple"] = gen_request["reference_information_extracted"] if isinstance(gen_request["reference_information_extracted"], list) else [gen_request["reference_information_extracted"]] params["reference_strength_multiple"] = gen_request["reference_strength"] if isinstance(gen_request["reference_strength"], list) else [gen_request["reference_strength"]] if gen_request["type"]=="inpaint": if "mask" in gen_request: params["mask"] = gen_request["mask"] params['add_original_image'] = gen_request['add_original_image'] positive = gen_request["prompt"] positive = re.sub(r'#.*?(\n|,|$)', '', positive) keywords = [key.strip() for key in positive.split(',')] if "cond_negative" in gen_request and gen_request["cond_negative"]: user_input = gen_request["cond_negative"] rating = gen_request["rating"] params["negative_prompt"] = parse_and_execute_commands(positive, params["negative_prompt"], user_input, rating) if "repeat" in gen_request: max = gen_request["repeat_max"] for i, key in enumerate(keywords): if "->" in key: instant_keyword = [k for k in key.split('->')] if len(instant_keyword) > gen_request["repeat"]: current_key = instant_keyword[gen_request["repeat"]] else: current_key = instant_keyword[gen_request["repeat"] % len(instant_keyword)] keywords[i] = ', '.join(current_key.split('^')) filename_rule = gen_request["png_rule"] save_folder = gen_request["save_folder"] access_token = gen_request["access_token"] additional_folder = "" request_type = "generate" if "*i2i:(" in gen_request["prompt"] and "image" not in gen_request: try: start_index = gen_request["prompt"].index('*i2i:(') + len('*i2i:(') end_index = gen_request["prompt"].index(')', start_index) i2i_content = gen_request["prompt"][start_index:end_index] _img, _str = [item.strip() for item in i2i_content.split(':')] _str = float(_str) _img = open_random_png(_img) if _img: gen_request["image"] = image_to_base64(_img) gen_request["strength"] = _str gen_request["noise"] = 0 except: pass if "nai_enable_AD" in gen_request: params["nai_enable_AD"] = True params["ad_data"] = gen_request["ad_data"] params["ad_data_str"] = gen_request["ad_data_str"] if "scheduler" in gen_request: params["scheduler"] = gen_request["scheduler"] if "pag" in gen_request: params["pag"] = gen_request["pag"] if "image" in gen_request: params["image"] = gen_request["image"] if "strength" in gen_request: params["strength"] = gen_request["strength"] params["noise"] = gen_request["noise"] params["sm"] = False params["sm_dyn"] = False request_type = "img2img" if "mask" not in gen_request else "infill" temp_del = [] for key in keywords: if key.startswith('*'): temp_del.append(key) for key in temp_del: if key in keywords: keywords.remove(key) positive = ', '.join(keywords) def resize_and_fill(image, max_size=None): if max_size is None: max_size = gen_request["user_screen_size"] original_width, original_height = image.size if original_width > max_size or original_height > max_size: # 비율을 유지하면서 크기 조정 image.thumbnail((max_size, max_size)) # 새 이미지 크기 계산 width, height = image.size new_image = Image.new("RGB", (max_size, max_size), "black") new_image.paste(image, ((max_size - width) // 2, (max_size - height) // 2)) return new_image else: return image def resize_and_fill_artist(image, max_size=None, _text=""): if _text != "": if "_" in _text: _text0 = _text.split("_")[0] _text1 = _text.split("_")[1] _text = _text1+ " : "+_text0 else: _text1 = _text if max_size is None: max_size = gen_request["user_screen_size"] original_width, original_height = image.size max_size = (max_size, max_size) if original_width > max_size[0] or original_height > max_size[1]: # 비율을 유지하면서 크기 조정 image.thumbnail(max_size) # 새 이미지 크기 계산 width, height = image.size new_image = Image.new("RGB", (max_size[0], max_size[1]), "black") # 하얀 상자를 위해 높이에 100 추가 new_image.paste(image, ((max_size[0] - width) // 2, (max_size[1] - height) // 2)) # 텍스트 추가 draw = ImageDraw.Draw(new_image) font_size = 24 font = ImageFont.truetype("arial.ttf", font_size) # 폰트 파일 경로 확인 필요 # textbbox 사용하여 텍스트 바운딩 박스 계산 text_x = (max_size[0]) // 2 text_y = (max_size[1] - 40) + (font_size) // 2 bbox = draw.textbbox((text_x, text_y), _text, font=font, anchor="mm") text_width = bbox[2] - bbox[0] text_height = bbox[3] - bbox[1] # 텍스트 배경 그리기 (하얀 상자) draw.rectangle([text_x - text_width // 2 - 10, text_y - text_height // 2 - 10, text_x + text_width // 2 + 10, text_y + text_height // 2 + 10], fill="white") # 텍스트 그리기 draw.text((text_x, text_y), _text, fill="black", font=font, anchor="mm") return new_image, _text1 def log_error(e, output_file_path="output_file_path"): # 현재 시간을 얻습니다 current_time = datetime.now().strftime("%m/%d %H:%M:%S") # 에러 로그 메시지 error_message = f"#### Error occured at {current_time} ####\nError: {e}\n############################################\n" # 지정된 출력 폴더의 error_log.txt 파일에 쓰기 with open(f"error_log.txt", "a") as file: file.write(error_message) if _naia == "NAID3-Furry": _m1 = "nai-diffusion-furry-3" _m2 = "nai-diffusion-furry-3-inpainting" else: _m1 = "nai-diffusion-3" _m2 = "nai-diffusion-3-inpainting" try: zipped_bytes = generate_image(access_token, positive, _m1 if "mask" not in params else _m2, request_type, params) if gen_request["png_rule"] == "count": additional_folder = "/" + gen_request["start_time"] if "additional_save_folder" in gen_request: if gen_request["additional_save_folder"]["command1"] != "": additional_folder += "/" + gen_request["additional_save_folder"]["command1"].replace('/',"_") if gen_request["additional_save_folder"]["command2"] != "": additional_folder += "/" + gen_request["additional_save_folder"]["command2"].replace('/',"_") forbidden_chars = r'[\:*?"<>|]' additional_folder = re.sub(forbidden_chars, '_', additional_folder) if gen_request["type"] == "turbo": additional_folder += "/turbo" if ("hires" in gen_request and gen_request["hires"]) or ("enable_hr" in gen_request and gen_request["enable_hr"] and "http" in access_token) or (int(gen_request["width"]) * int(gen_request["height"]) > 1048576): additional_folder += "/hires" if "auto_i2i" in gen_request and gen_request["auto_i2i"]: if gen_request["png_rule"] == "count": additional_folder = "/" + gen_request["start_time"] + "/autoi2i" else: additional_folder = "/autoi2i" if "nai_enable_AD" in gen_request and gen_request["nai_enable_AD"]: additional_folder += "/Adetailer" if "webui_nai_i2i" in gen_request and gen_request["webui_nai_i2i"] and "http" not in access_token: additional_folder += "/webui_nai_i2i" d = Path(save_folder + additional_folder) d.mkdir(parents=True, exist_ok=True) zipped = zipfile.ZipFile(io.BytesIO(zipped_bytes)) result_list = [] for idx, file_info in enumerate(zipped.infolist()): image_bytes = zipped.read(file_info) if gen_request["png_rule"] == "count": _count = gen_request["count"] if "batch_size" in gen_request: filename = (d / f"{_count:05}_{idx}.png" ) else: filename = (d / f"{_count:05}.png" ) else: if "batch_size" in gen_request: filename = (d / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{idx}.png" ) else: filename = (d / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" ) filename.write_bytes(image_bytes) i = Image.open(io.BytesIO(image_bytes)) i = ImageOps.exif_transpose(i).convert("RGB") if "artist" not in gen_request: i_resized = resize_and_fill(i) else: i_resized, _i_temp = resize_and_fill_artist(i, None, gen_request["artist"]) if not os.path.exists(f"artist_thumb"): os.makedirs(f"artist_thumb") model_name = gen_request["artist_model"] if not os.path.exists(f"artist_thumb/{model_name}"): os.makedirs(f"artist_thumb/{model_name}") _counts = gen_request["artist"].split("_")[0] i_resized.save(f"artist_thumb/{model_name}/{_i_temp}.jpg") if not os.path.exists(f"artist_thumb/{model_name}/add_count"): os.makedirs(f"artist_thumb/{model_name}/add_count") i_resized.save(f"artist_thumb/{model_name}/add_count/{_counts}_{_i_temp}.jpg") next = i_resized, positive, params['seed'], i.info, str(filename) result_list.append(next) return i_resized, positive, params['seed'], i.info, str(filename) if len(result_list) ==1 else result_list except Exception as e: try: if zipped_bytes is None: raise ValueError("Connection broken (Protocol Error)") error_message = zipped_bytes.decode('utf-8')[2:-2] except Exception as inner_exception: error_message = str(inner_exception) log_error(error_message, "path_to_output_folder") return None, error_message, params['seed'], None, None