##~ DOWNLOADING CODE | BY: ANXETY ~## from directory_setup import * from models_data import model_list, vae_list, controlnet_list import os import re import time import json import shutil import zipfile import requests import subprocess from datetime import timedelta from subprocess import getoutput from IPython.utils import capture from IPython.display import clear_output from urllib.parse import urlparse, parse_qs # Setup Env env = os.getenv('ENV_NAME') root_path = os.getenv('ROOT_PATH') webui_path = os.getenv('WEBUI_PATH') free_plan = os.getenv('FREE_PLAN') # ================ LIBRARIES V2 ================ flag_file = f"{root_path}/libraries_installed.txt" if not os.path.exists(flag_file): print("💿 Установка библиотек, это займет какое-то время:\n") install_lib = { # "aria2": "apt -y install aria2", "aria2": "pip install aria2", "localtunnel": "npm install -g localtunnel", "insightface": "pip install insightface" } additional_libs = { "Google Colab": { "xformers": "pip install xformers==0.0.27 --no-deps" }, "Kaggle": { "xformers": "pip install xformers==0.0.26.post1", # "torch": "pip install torch==2.1.2+cu121 torchvision==0.16.2+cu121 torchaudio==2.1.2 --extra-index-url https://download.pytorch.org/whl/cu121", # "aiohttp": "pip install trash-cli && trash-put /opt/conda/lib/python3.10/site-packages/aiohttp*" # fix install req } } if env in additional_libs: install_lib.update(additional_libs[env]) # Loop through libraries for index, (package, install_cmd) in enumerate(install_lib.items(), start=1): print(f"\r[{index}/{len(install_lib)}] \033[32m>>\033[0m Installing \033[33m{package}\033[0m..." + " "*35, end='') subprocess.run(install_cmd, shell=True, capture_output=True) # Additional specific packages with capture.capture_output() as cap: get_ipython().system('curl -s -OL https://github.com/DEX-1101/sd-webui-notebook/raw/main/res/new_tunnel --output-dir {root_path}') get_ipython().system('curl -s -Lo /usr/bin/cl https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64 && chmod +x /usr/bin/cl') get_ipython().system('curl -sLO https://github.com/openziti/zrok/releases/download/v0.4.32/zrok_0.4.32_linux_amd64.tar.gz && tar -xzf zrok_0.4.32_linux_amd64.tar.gz -C /usr/bin && rm -f zrok_0.4.32_linux_amd64.tar.gz') del cap clear_output() # Save file install lib with open(flag_file, "w") as f: f.write(">W<'") print("🍪 Библиотеки установлены!" + " "*35) time.sleep(2) clear_output() # ================= loading settings V4 ================= def load_settings(path): if os.path.exists(path): with open(path, 'r') as file: return json.load(file) return {} settings = load_settings(f'{root_path}/settings.json') VARIABLES = [ 'model', 'model_num', 'inpainting_model', 'vae', 'vae_num', 'latest_webui', 'latest_exstensions', 'change_webui', 'detailed_download', 'controlnet', 'controlnet_num', 'commit_hash', 'huggingface_token', 'ngrok_token', 'zrok_token', 'commandline_arguments', 'Model_url', 'Vae_url', 'LoRA_url', 'Embedding_url', 'Extensions_url', 'custom_file_urls' ] locals().update({key: settings.get(key) for key in VARIABLES}) # ================= OTHER ================= try: start_colab except: start_colab = int(time.time())-5 # ================= MAIN CODE ================= if not os.path.exists(webui_path): start_install = int(time.time()) print("⌚ Распаковка Stable Diffusion..." if change_webui != 'Forge' else "⌚ Распаковка Stable Diffusion (Forge)...", end='') with capture.capture_output() as cap: aria2_command = "aria2c --console-log-level=error -c -x 16 -s 16 -k 1M" url = "https://huggingface.co/NagisaNao/fast_repo/resolve/main/FULL_REPO.zip" if change_webui != 'Forge' else "https://huggingface.co/NagisaNao/fast_repo/resolve/main/FULL_REPO_forge.zip" get_ipython().system('{aria2_command} {url} -o repo.zip') get_ipython().system('unzip -q -o repo.zip -d {webui_path}') get_ipython().system('rm -rf repo.zip') get_ipython().run_line_magic('cd', '{root_path}') os.environ["SAFETENSORS_FAST_GPU"]='1' os.environ["CUDA_MODULE_LOADING"]="LAZY" os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" os.environ["PYTHONWARNINGS"] = "ignore" get_ipython().system('echo -n {start_colab} > {webui_path}/static/colabTimer.txt') del cap install_time = timedelta(seconds=time.time()-start_install) print("\r🚀 Распаковка Завершена! За","%02d:%02d:%02d ⚡\n" % (install_time.seconds / 3600, (install_time.seconds / 60) % 60, install_time.seconds % 60), end='', flush=True) else: print("🚀 Все распакованно... Пропуск. ⚡") start_colab = float(open(f'{webui_path}/static/colabTimer.txt', 'r').read()) time_since_start = str(timedelta(seconds=time.time()-start_colab)).split('.')[0] print(f"⌚️ Вы проводите эту сессию в течение - \033[33m{time_since_start}\033[0m") ## Changes extensions and WebUi if latest_webui or latest_exstensions: action = "Обновление WebUI и Расширений" if latest_webui and latest_exstensions else ("Обновление WebUI" if latest_webui else "Обновление Расширений") print(f"⌚️ {action}...", end='', flush=True) with capture.capture_output() as cap: get_ipython().system('git config --global user.email "you@example.com"') get_ipython().system('git config --global user.name "Your Name"') ## Update Webui if latest_webui: get_ipython().run_line_magic('cd', '{webui_path}') get_ipython().system('git restore .') get_ipython().system('git pull -X theirs --rebase --autostash') ## Update extensions if latest_exstensions: get_ipython().system('{\'for dir in \' + webui_path + \'/extensions/*/; do cd \\"$dir\\" && git reset --hard && git pull; done\'}') del cap print(f"\r✨ {action} Завершено!") # === FIXING EXTENSIONS === anxety_repos = "https://huggingface.co/NagisaNao/fast_repo/resolve/main" with capture.capture_output() as cap: # --- Umi-Wildcard --- get_ipython().system("sed -i '521s/open=\\(False\\|True\\)/open=False/' {webui_path}/extensions/Umi-AI-Wildcards/scripts/wildcard_recursive.py # Closed accordion by default") # --- Encrypt-Image --- get_ipython().system("sed -i '9,37d' {webui_path}/extensions/Encrypt-Image/javascript/encrypt_images_info.js # Removes the weird text in webui") # --- Additional-Networks --- get_ipython().system('wget -O {webui_path}/extensions/additional-networks/scripts/metadata_editor.py {anxety_repos}/extensions/Additional-Networks/fix/metadata_editor.py # Fixing an error due to old style') del cap ## Version switching if commit_hash: print('⏳ Активация машины времени...', end="", flush=True) with capture.capture_output() as cap: get_ipython().run_line_magic('cd', '{webui_path}') get_ipython().system('git config --global user.email "you@example.com"') get_ipython().system('git config --global user.name "Your Name"') get_ipython().system('git reset --hard {commit_hash}') del cap print(f"\r⌛️ Машина времени активированна! Текущий коммит: \033[34m{commit_hash}\033[0m") ## Downloading model and stuff | oh~ Hey! If you're freaked out by that code too, don't worry, me too! print("📦 Скачивание моделей и прочего...", end='') url = "" PREFIXES = { "model": models_dir, "vae": vaes_dir, "lora": loras_dir, "embed": embeddings_dir, "extension": extensions_dir, "control": control_dir, "adetailer": adetailer_dir, "config": webui_path } extension_repo = [] directories = [value for key, value in PREFIXES.items()] # for unpucking zip files get_ipython().system('mkdir -p {" ".join(directories)}') hf_token = huggingface_token if huggingface_token else "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" user_header = f"\"Authorization: Bearer {hf_token}\"" ''' Formatted Info Output ''' from math import floor def center_text(text, terminal_width=45): padding = (terminal_width - len(text)) // 2 return f"\033[1m\033[36m{' ' * padding}{text}{' ' * padding}\033[0m\033[32m" def format_output(url, dst_dir, file_name): info = center_text(f"[{file_name.split('.')[0]}]") separation_line = '\033[32m' + '---' * 20 print(f"\n{separation_line}{info}{separation_line}") print(f"\033[33mURL: \033[34m{url}") print(f"\033[33mSAVE DIR: \033[34m{dst_dir}") print(f"\033[33mFILE NAME: \033[34m{file_name}\033[0m") ''' GET CivitAi API - DATA ''' def CivitAi_API(url, file_name=None): SUPPORT_TYPES = ('Checkpoint', 'Model', 'TextualInversion', 'LORA') CIVITAI_TOKEN = "62c0c5956b2f9defbd844d754000180b" url = url.split('?token=')[0] if '?token=' in url else url url = url.replace('?type=', f'?token={CIVITAI_TOKEN}&type=') if '?type=' in url else f"{url}?token={CIVITAI_TOKEN}" def get_model_data(url): if "civitai.com/models/" in url: if '?modelVersionId=' in url: version_id = url.split('?modelVersionId=')[1] return requests.get(f"https://civitai.com/api/v1/model-versions/{version_id}").json() else: model_id = url.split('/models/')[1].split('/')[0] return requests.get(f"https://civitai.com/api/v1/models/{model_id}").json() else: version_id = url.split('/models/')[1].split('/')[0] return requests.get(f"https://civitai.com/api/v1/model-versions/{version_id}").json() data = get_model_data(url) if not data: return None, None, None, None, None, None, None def extract_model_info(url, data): if "civitai.com/models/" in url: if '?modelVersionId=' in url: model_type = data['model']['type'] model_name = data['files'][0]['name'] else: model_type = data['type'] model_name = data['modelVersions'][0]['files'][0]['name'] elif 'type=' in url: model_type = parse_qs(urlparse(url).query).get('type', [''])[0] if 'model' in model_type.lower(): model_name = data['files'][0]['name'] else: model_name = data['files'][1]['name'] else: model_type = data['model']['type'] model_name = data['files'][0]['name'] return model_type, model_name model_type, model_name = extract_model_info(url, data) model_name = file_name or model_name def get_download_url(url, data, model_type): if "civitai.com/models/" in url: if '?modelVersionId=' in url: return data.get('downloadUrl') else: return data["modelVersions"][0].get("downloadUrl", "") elif 'type=' in url: if any(t.lower() in model_type.lower() for t in SUPPORT_TYPES): return data['files'][0]['downloadUrl'] else: return data['files'][1]['downloadUrl'] else: return data.get('downloadUrl') download_url = get_download_url(url, data, model_type) clean_url = re.sub(r'[?&]token=[^&]*', '', download_url) def get_image_info(data, model_type, model_name): image_url, image_name = None, None if any(t in model_type for t in SUPPORT_TYPES): try: images = data.get('images') or data['modelVersions'][0].get('images', []) if env == 'Kaggle': image_url = next((image['url'] for image in images if image['nsfwLevel'] < 4), None) else: image_url = images[0]['url'] if images else None except KeyError: pass image_name = f"{model_name.split('.')[0]}.preview.{image_url.split('.')[-1]}" if image_url else None return image_url, image_name image_url, image_name = get_image_info(data, model_type, model_name) return f"{download_url}{'&' if '?' in download_url else '?'}token={CIVITAI_TOKEN}", clean_url, model_type, model_name, image_url, image_name, data ''' Main Download Code ''' def strip_(url): if 'github.com' in url: return url.replace('/blob/', '/raw/') elif "huggingface.co" in url: url = url.replace('/blob/', '/resolve/') return url.split('?')[0] if '?' in url else url return url def download(url): links_and_paths = [link_or_path.strip() for link_or_path in url.split(',') if link_or_path.strip()] for link_or_path in links_and_paths: if any(link_or_path.lower().startswith(prefix) for prefix in PREFIXES): handle_manual(link_or_path) else: url, dst_dir, file_name = link_or_path.split() manual_download(url, dst_dir, file_name) # Unpuck ZIPs Files for directory in directories: for root, _, files in os.walk(directory): for file in files: if file.endswith(".zip"): zip_path = os.path.join(root, file) extract_path = os.path.splitext(zip_path)[0] with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(extract_path) os.remove(zip_path) def handle_manual(url): url_parts = url.split(':', 1) prefix, path = url_parts[0], url_parts[1] file_name_match = re.search(r'\[(.*?)\]', path) file_name = file_name_match.group(1) if file_name_match else None if file_name: path = re.sub(r'\[.*?\]', '', path) if prefix in PREFIXES: dir = PREFIXES[prefix] if prefix != "extension": try: manual_download(path, dir, file_name=file_name) except Exception as e: print(f"Error downloading file: {e}") else: extension_repo.append((path, file_name)) def manual_download(url, dst_dir, file_name): header_option = f"--header={user_header}" aria2c_header = "--header='User-Agent: Mozilla/5.0' --allow-overwrite=true" aria2_args = "--optimize-concurrent-downloads --console-log-level=error --summary-interval=10 --stderr=true -c -x16 -s16 -k1M -j5" clean_url = strip_(url) if 'civitai' in url: url, clean_url, model_type, file_name, image_url, image_name, data = CivitAi_API(url, file_name) if image_url and image_name: command = ["aria2c"] + aria2_args.split() + ["-d", dst_dir, "-o", image_name, image_url] subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) elif 'github' in url or "huggingface.co" in url: basename = url.split("/")[-1] if file_name is None else file_name """ Formatted info output """ model_name_or_basename = file_name if file_name else basename format_output(clean_url or url, dst_dir, model_name_or_basename) # print(url, dst_dir, model_name_or_basename) if 'civitai' in url: if not data: print("\033[31m[Data Info]:\033[0m Failed to retrieve data from the API.\n") if data and image_name: print(f"\033[32m[Preview DL]:\033[0m {image_name} - {image_url}\n") # ===================== def run_aria2c(url, dst_dir, file_name=None, args="", header=""): out = f"-o '{file_name}'" if file_name else "" get_ipython().system("aria2c {header} {args} -d {dst_dir} {out} '{url}'") # -- Google Drive -- if 'drive.google' in url: if not globals().get('have_drive_link', False): os.system("pip install -U gdown > /dev/null") globals()['have_drive_link'] = True if 'folders' in url: os.system(f"gdown --folder \"{url}\" -O {dst_dir} --fuzzy -c") else: out_path = f"{dst_dir}/{file_name}" if file_name else dst_dir os.system(f"gdown \"{url}\" -O {out_path} --fuzzy -c") # -- GitHub or Hugging Face -- elif 'github' in url or 'huggingface' in url: run_aria2c(clean_url, dst_dir, basename, aria2_args, header_option if 'huggingface' in url else '') # -- Other HTTP/Sources -- elif 'http' in url: run_aria2c(url, dst_dir, file_name, aria2_args, aria2c_header) ''' SubModels - Added URLs ''' def add_submodels(selection, num_selection, model_dict, dst_dir): if selection == "none": return [] if selection == "ALL": all_models = [] for models in model_dict.values(): all_models.extend(models) selected_models = all_models else: selected_models = model_dict[selection] selected_nums = map(int, num_selection.replace(',', '').split()) for num in selected_nums: if 1 <= num <= len(model_dict): name = list(model_dict)[num - 1] selected_models.extend(model_dict[name]) unique_models = list({model['name']: model for model in selected_models}.values()) for model in unique_models: model['dst_dir'] = dst_dir return unique_models def handle_submodels(selection, num_selection, model_dict, dst_dir, url): submodels = add_submodels(selection, num_selection, model_dict, dst_dir) for submodel in submodels: if not inpainting_model and "inpainting" in submodel['name']: continue url += f"{submodel['url']} {submodel['dst_dir']} {submodel['name']}, " return url url = handle_submodels(model, model_num, model_list, models_dir, url) url = handle_submodels(vae, vae_num, vae_list, vaes_dir, url) url = handle_submodels(controlnet, controlnet_num, controlnet_list, control_dir, url) ''' file.txt - added urls ''' def process_file_download(file_url, PREFIXES, unique_urls): files_urls = "" if file_url.startswith("http"): if "blob" in file_url: file_url = file_url.replace("blob", "raw") response = requests.get(file_url) lines = response.text.split('\n') else: with open(file_url, 'r') as file: lines = file.readlines() current_tag = None for line in lines: line = line.strip() if any(f'# {tag}' in line.lower() for tag in PREFIXES): current_tag = next((tag for tag in PREFIXES if tag in line.lower())) urls = [url.split('#')[0].strip() for url in line.split(',')] # filter urls for url in urls: filter_url = url.split('[')[0] # same url filter if url.startswith("http") and filter_url not in unique_urls: files_urls += f"{current_tag}:{url}, " unique_urls.add(filter_url) return files_urls file_urls = "" unique_urls = set() if custom_file_urls: for custom_file_url in custom_file_urls.replace(',', '').split(): if not custom_file_url.endswith('.txt'): custom_file_url += '.txt' if not custom_file_url.startswith('http'): if not custom_file_url.startswith(root_path): custom_file_url = f'{root_path}/{custom_file_url}' try: file_urls += process_file_download(custom_file_url, PREFIXES, unique_urls) except FileNotFoundError: pass # url prefixing urls = (Model_url, Vae_url, LoRA_url, Embedding_url, Extensions_url) prefixed_urls = (f"{prefix}:{url}" for prefix, url in zip(PREFIXES.keys(), urls) if url for url in url.replace(',', '').split()) url += ", ".join(prefixed_urls) + ", " + file_urls if detailed_download == "on": print("\n\n\033[33m# ====== Подробная Загрузка ====== #\n\033[0m") download(url) print("\n\033[33m# =============================== #\n\033[0m") else: with capture.capture_output() as cap: download(url) del cap print("\r🏁 Скачивание Завершено!" + " "*15) # Cleaning shit after downloading... get_ipython().system('find {webui_path} \\( -type d \\( -name ".ipynb_checkpoints" -o -name ".aria2" \\) -o -type f -name "*.aria2" \\) -exec rm -r {{}} \\; >/dev/null 2>&1') ## Install of Custom extensions if len(extension_repo) > 0: print("✨ Установка кастомных расширений...", end='', flush=True) with capture.capture_output() as cap: for repo, repo_name in extension_repo: if not repo_name: repo_name = repo.split('/')[-1] get_ipython().system('cd {extensions_dir} && git clone {repo} {repo_name} && cd {repo_name} && git fetch') del cap print(f"\r📦 Установлено '{len(extension_repo)}', Кастомных расширений!") ## List Models and stuff V2 if detailed_download == "off": print("\n\n\033[33mЕсли вы не видете каких-то скаченных файлов, включите в виджетах функцию 'Подробная Загрузка'.") get_ipython().run_line_magic('run', '{root_path}/file_cell/special/dl_display_results.py # display widgets result')