import requests import random from PIL import Image, ImageOps, ImageTk from datetime import datetime import time from pathlib import Path import io import zipfile #for webui import base64 import re import json BASE_URL="https://api.novelai.net" def make_turbo_prompt(gen_request): lines = gen_request['prompt'] result = { "boys": False, "girls": False, "1girl": False, "1boy": False, "1other": False, "others": False } state = { "nude,": False, "pov,": False, "cum,": False, "after ": False, "pussy juice": False, "barefoot": False, "breasts": False, "ejaculation": False, } def insert_spaces(source_list, reference_list): modified_list = source_list.copy() for index, keyword in enumerate(reference_list): if keyword not in source_list: space_count = len(keyword) # 키워드 길이만큼의 공백 문자 modified_list.insert(index, ' ' * space_count) return modified_list keywords = gen_request['prompt'].split(', ') filtered_keywords = [] removed_indices = [] positive0, positive1, positive2, positive3 = gen_request.copy(),gen_request.copy(),gen_request.copy(),gen_request.copy() for word in result.keys(): if word in lines: result[word] = True for word in state.keys(): if word in gen_request['prompt']: state[word] = True key_index = int((len(keywords)/2)-1) if(result["1boy"]) or (result["boys"]): if(result["1girl"]): if('sex,' in gen_request['prompt']): sex_pos_keywords = ['stomach bulge','insertion', 'fucked silly', 'x-ray', 'orgasm', 'cross-section', 'uterus', 'overflow', 'rape', 'vaginal', 'anal'] facial_keywords = ['tongue','ahegao'] temp_sex_pos = [] temp_facial = [] cum_events = [] explicit_check = [] if 'open mouth' in keywords: keywords.remove('open mouth') if 'closed mouth' in keywords: keywords.remove('closed mouth') if 'after rape' in keywords: keywords.remove('after rape') explicit_check.append('after rape') for keyword in keywords: if ('sex' not in keyword and 'cum' not in keyword and 'ejaculation' not in keyword and 'vaginal' not in keyword and 'penetration' not in keyword) and all(sex_pos not in keyword for sex_pos in sex_pos_keywords) and all(facial not in keyword for facial in facial_keywords): filtered_keywords.append(keyword) elif 'sex' in keyword: removed_indices.append(keyword) elif 'penetration' in keyword: removed_indices.append(keyword) elif 'cum' in keyword and keyword != 'cum': cum_events.append(keyword) elif any(sex_pos in keyword for sex_pos in sex_pos_keywords): for sex_pos in sex_pos_keywords: if sex_pos in keyword: temp_sex_pos.append(sex_pos) elif any(facial not in keyword for facial in facial_keywords): for facial in facial_keywords: if facial in keyword: temp_facial.append(facial) filtered_keywords.insert(int((len(filtered_keywords)/2)-1), ' no penetration, imminent penetration') filtered_keywords_positive0 = filtered_keywords.copy() filtered_keywords.remove(' no penetration, imminent penetration') #0 imminent penetration, imminent sex for i, keyword in enumerate(filtered_keywords): if 'pantyhose' in keyword: filtered_keywords[i] = 'torn ' + filtered_keywords[i] #1 default key_index = int((len(filtered_keywords)/2)-1) if 'pussy' in filtered_keywords: key_index = filtered_keywords.index('pussy') if 'penis' in filtered_keywords: key_index = filtered_keywords.index('penis') filtered_keywords[key_index:key_index] = ['motion lines', 'surprised'] for keyword in removed_indices: if 'cum' not in keyword and 'ejaculation' not in keyword: filtered_keywords.insert(key_index,keyword) if(temp_sex_pos): filtered_keywords[key_index:key_index] = temp_sex_pos if('clothed sex' in filtered_keywords and not 'bottomless' in filtered_keywords): filtered_keywords.insert(filtered_keywords.index('clothed sex')+1, 'bottomless') pos1_copied_keywords = filtered_keywords.copy() for i, keyword in enumerate(pos1_copied_keywords): if 'closed eyes' in keyword: rand_num = random.randint(0,2) if(rand_num == 0): pos1_copied_keywords[i] = 'half-' + pos1_copied_keywords[i] elif(rand_num == 1 and 'closed eyes' in pos1_copied_keywords): pos1_copied_keywords.remove('closed eyes') filtered_keywords[i] = 'half-closed eyes' filtered_keywords_positive1 = pos1_copied_keywords.copy() #2 ejaculation,cum in pussy key_index = filtered_keywords.index('surprised') filtered_keywords.remove('surprised') filtered_keywords[key_index:key_index] = ["ejaculation","cum"] for keyword in removed_indices: if 'cum' in keyword: filtered_keywords.insert(key_index,keyword) if(temp_facial): filtered_keywords[key_index:key_index] =temp_facial filtered_keywords_positive2 = filtered_keywords.copy() #3 after sex, after ejaculation for i, keyword in enumerate(filtered_keywords): if 'closed eyes' in keyword: rand_num = random.randint(0,2) if(rand_num == 0 and filtered_keywords[i] != 'half-closed eyes'): filtered_keywords[i] = 'half-' + filtered_keywords[i] elif(rand_num == 1): filtered_keywords[i] = 'empty eyes' else: filtered_keywords[i] = 'empty eyes, half-closed eyes' if 'sex' in filtered_keywords: key_index = filtered_keywords.index('sex') elif 'group sex' in filtered_keywords: key_index = filtered_keywords.index('group sex') filtered_keywords.remove('ejaculation') filtered_keywords[key_index:key_index] = ['cum drip', 'erection'] + cum_events if(explicit_check): filtered_keywords[key_index:key_index] = explicit_check if 'sex' in filtered_keywords and 'group sex' not in filtered_keywords: if('pussy' in filtered_keywords and not 'anal' in filtered_keywords): filtered_keywords.insert(filtered_keywords.index('sex')+1, 'after vaginal, spread pussy') elif('anal' in filtered_keywords): filtered_keywords.insert(filtered_keywords.index('sex')+1, 'after anus, cum in ass') filtered_keywords.insert(filtered_keywords.index('sex'), 'after sex') filtered_keywords.remove('sex') elif 'group sex' in filtered_keywords: if('vaginal' in filtered_keywords and not 'anal' in filtered_keywords): filtered_keywords.insert(filtered_keywords.index('group sex')+1, 'after vaginal, spread pussy') if 'multiple penises' in filtered_keywords: filtered_keywords.insert(filtered_keywords.index('group sex')+3, 'cum on body, bukkake') elif('anal' in filtered_keywords): filtered_keywords.insert(filtered_keywords.index('group sex')+1, 'after anus, cum in ass') if 'multiple penises' in filtered_keywords: filtered_keywords.insert(filtered_keywords.index('group sex')+3, 'cum on body, bukkake') else: filtered_keywords.insert(filtered_keywords.index('group sex')+1, 'cum on body, {bukkake}') temp_post_keyword = [] for keyword in sex_pos_keywords: if not (keyword == 'orgasm' or keyword == 'overflow'): if keyword in filtered_keywords: temp_post_keyword.append(keyword) for keyword in temp_post_keyword: filtered_keywords.remove(keyword) positive0['prompt'] = ', '.join(insert_spaces(filtered_keywords_positive0, filtered_keywords)).strip() positive1['prompt'] = ', '.join(insert_spaces(filtered_keywords_positive1, filtered_keywords)).strip() positive2['prompt'] = ', '.join(insert_spaces(filtered_keywords_positive2, filtered_keywords)).strip() positive3['prompt'] = ', '.join(filtered_keywords).strip() positive0["type"] = "turbo" positive1["type"] = "turbo" positive2["type"] = "turbo" positive3["type"] = "turbo" return positive0, positive1, positive2, positive3 def generate_image_NAI(access_token, prompt, model, action, parameters): data = { "input": prompt, "model": model, "action": action, "parameters": parameters, } response = requests.post(f"{BASE_URL}/ai/generate-image", json=data, headers={ "Authorization": f"Bearer {access_token}" }) # catch any errors return response.content # WebUI def convert_prompt(prompt): return (prompt.replace('(','\\(').replace(')','\\)') # (tag) to \(tag\) .replace('{{', '(').replace('}}',')').replace('{', '(').replace('}', ')') # {{{tag}}} to ((tag)) .replace('[[', '[').replace(']]', ']')) # [[[tag]]] to [[tag]] def generate_image_webui(access_token, prompt, model, action, parameters): samplers = { "k_euler": "Euler", "k_euler_ancestral": "Euler a", "k_dpmpp_2s_ancestral": "DPM++ 2S a", "k_dpmpp_sde": "DPM++ SDE" } # matching data format data = { "input": prompt, "model": model, "action": action, "parameters": parameters, } params = { "prompt": convert_prompt(data['input']), "negative_prompt": convert_prompt(data['parameters']['negative_prompt']), "steps": data['parameters']['steps'], "width": data['parameters']['width'], "height": data['parameters']['height'], "cfg_scale": data['parameters']['scale'], "sampler_index": samplers[data['parameters']['sampler']], "seed": data['parameters']['seed'], "seed_resize_from_h": -1, "seed_resize_from_w": -1, "denoising_strength": None, "n_iter": "1", "batch_size": data['parameters']['n_samples'] } if data['parameters']['enable_hr'] == True: params['enable_hr'] = True params["hr_upscaler"] = data['parameters']["hr_upscaler"] params["hr_scale"] = data['parameters']["hr_scale"] params["hr_second_pass_steps"] = data['parameters']["hr_second_pass_steps"] params["denoising_strength"] = data['parameters']["denoising_strength"] res = requests.post(f"{access_token}/sdapi/v1/txt2img", json=params) imageb64s = res.json()['images'] content = None for b64 in imageb64s: img = b64.encode() content = base64.b64decode(img) s = io.BytesIO() zf = zipfile.ZipFile(s, "w") zf.writestr("generated", content) zf.close() return s.getvalue() # WebUI def generate_image(access_token, prompt, model, action, parameters): if re.match(r'^http[s]?://', access_token): return generate_image_webui(**locals()) return generate_image_NAI(**locals()) def generate(gen_request): params = { "legacy": False, "quality_toggle": True if gen_request["quality_toggle"] == 1 else False, "width": gen_request["width"], "height": gen_request["height"], "n_samples": 1, "seed": gen_request["seed"], "extra_noise_seed": random.randint(0,9999999999), "sampler": gen_request["sampler"], "steps": 28 if (gen_request["type"]!="upper" or "steps" not in gen_request) else gen_request["steps"], "scale": gen_request["scale"], "uncond_scale": 1.0, "negative_prompt": gen_request["negative"], "sm" : gen_request["sema"], "sm_dyn" : gen_request["sema_dyn"], "decrisper": False, "controlnet_strength": 1.0, "add_original_image": False, "cfg_rescale": gen_request["cfg_rescale"], "noise_schedule": "native", "enable_hr" : gen_request["enable_hr"] } if params["enable_hr"] == True: params["hr_upscaler"] = gen_request["hr_upscaler"] params["hr_scale"] = gen_request["hr_scale"] params["hr_second_pass_steps"] = gen_request["hr_second_pass_steps"] params["denoising_strength"] = gen_request["denoising_strength"] # 와일드카드 기능 만들어야함 positive = gen_request["prompt"] filename_rule = gen_request["png_rule"] save_folder = gen_request["save_folder"] access_token = gen_request["access_token"] additional_folder = "" def resize_and_fill(image, max_size=None): if max_size is None: max_size = gen_request["user_screen_size"] original_width, original_height = image.size if original_width > max_size or original_height > max_size: # 비율을 유지하면서 크기 조정 image.thumbnail((max_size, max_size)) # 새 이미지 크기 계산 width, height = image.size new_image = Image.new("RGB", (max_size, max_size), "black") new_image.paste(image, ((max_size - width) // 2, (max_size - height) // 2)) return new_image else: return image def log_error(e, output_file_path="output_file_path"): # 현재 시간을 얻습니다 current_time = datetime.now().strftime("%m/%d %H:%M:%S") # 에러 로그 메시지 error_message = f"#### Error occured at {current_time} ####\nError: {e}\n############################################\n" # 지정된 출력 폴더의 error_log.txt 파일에 쓰기 with open(f"error_log.txt", "a") as file: file.write(error_message) try: zipped_bytes = generate_image(access_token, positive, "nai-diffusion-3", "generate", params) if gen_request["png_rule"] == "count": additional_folder = "/" + gen_request["start_time"] if gen_request["type"] == "turbo": additional_folder += "/turbo" d = Path(save_folder + additional_folder) d.mkdir(parents=True, exist_ok=True) zipped = zipfile.ZipFile(io.BytesIO(zipped_bytes)) image_bytes = zipped.read(zipped.infolist()[0]) if gen_request["png_rule"] == "count": _count = gen_request["count"] filename = (d / f"{_count:05}.png" ) else: filename = (d / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" ) filename.write_bytes(image_bytes) i = Image.open(io.BytesIO(image_bytes)) i = ImageOps.exif_transpose(i).convert("RGB") i_resized = resize_and_fill(i) #tk_image = ImageTk.PhotoImage(i_resized) return i_resized, positive, params['seed'], i.info, str(filename) except Exception as e: try: if zipped_bytes is None: raise ValueError("Connection broken (Protocol Error)") error_message = zipped_bytes.decode('utf-8')[2:-2] except Exception as inner_exception: error_message = str(inner_exception) log_error(error_message, "path_to_output_folder") return None, error_message, params['seed'], None, None