Spaces:
Runtime error
Runtime error
import copy | |
import hashlib | |
import os.path | |
from rich import progress, errors | |
from modules import shared | |
from modules.paths import data_path | |
cache_filename = os.path.join(data_path, "cache.json") | |
cache_data = None | |
progress_ok = True | |
def dump_cache(): | |
shared.writefile(cache_data, cache_filename) | |
def cache(subsection): | |
global cache_data # pylint: disable=global-statement | |
if cache_data is None: | |
cache_data = {} if not os.path.isfile(cache_filename) else shared.readfile(cache_filename, lock=True) | |
s = cache_data.get(subsection, {}) | |
cache_data[subsection] = s | |
return s | |
def calculate_sha256(filename, quiet=False): | |
global progress_ok # pylint: disable=global-statement | |
hash_sha256 = hashlib.sha256() | |
blksize = 1024 * 1024 | |
if not quiet: | |
if progress_ok: | |
try: | |
with progress.open(filename, 'rb', description=f'[cyan]Calculating hash: [yellow]{filename}', auto_refresh=True, console=shared.console) as f: | |
for chunk in iter(lambda: f.read(blksize), b""): | |
hash_sha256.update(chunk) | |
except errors.LiveError: | |
shared.log.warning('Hash: attempting to use function in a thread') | |
progress_ok = False | |
if not progress_ok: | |
with open(filename, 'rb') as f: | |
for chunk in iter(lambda: f.read(blksize), b""): | |
hash_sha256.update(chunk) | |
else: | |
with open(filename, 'rb') as f: | |
for chunk in iter(lambda: f.read(blksize), b""): | |
hash_sha256.update(chunk) | |
return hash_sha256.hexdigest() | |
def sha256_from_cache(filename, title, use_addnet_hash=False): | |
hashes = cache("hashes-addnet") if use_addnet_hash else cache("hashes") | |
if title not in hashes: | |
return None | |
cached_sha256 = hashes[title].get("sha256", None) | |
cached_mtime = hashes[title].get("mtime", 0) | |
ondisk_mtime = os.path.getmtime(filename) if os.path.isfile(filename) else 0 | |
if ondisk_mtime > cached_mtime or cached_sha256 is None: | |
return None | |
return cached_sha256 | |
def sha256(filename, title, use_addnet_hash=False): | |
global progress_ok # pylint: disable=global-statement | |
hashes = cache("hashes-addnet") if use_addnet_hash else cache("hashes") | |
sha256_value = sha256_from_cache(filename, title, use_addnet_hash) | |
if sha256_value is not None: | |
return sha256_value | |
if shared.cmd_opts.no_hashing: | |
return None | |
if not os.path.isfile(filename): | |
return None | |
orig_state = copy.deepcopy(shared.state) | |
shared.state.begin("hash") | |
if use_addnet_hash: | |
if progress_ok: | |
try: | |
with progress.open(filename, 'rb', description=f'[cyan]Calculating hash: [yellow]{filename}', auto_refresh=True, console=shared.console) as f: | |
sha256_value = addnet_hash_safetensors(f) | |
except errors.LiveError: | |
shared.log.warning('Hash: attempting to use function in a thread') | |
progress_ok = False | |
if not progress_ok: | |
with open(filename, 'rb') as f: | |
sha256_value = addnet_hash_safetensors(f) | |
else: | |
sha256_value = calculate_sha256(filename) | |
hashes[title] = { | |
"mtime": os.path.getmtime(filename), | |
"sha256": sha256_value | |
} | |
shared.state.end() | |
shared.state = orig_state | |
dump_cache() | |
return sha256_value | |
def addnet_hash_safetensors(b): | |
"""kohya-ss hash for safetensors from https://github.com/kohya-ss/sd-scripts/blob/main/library/train_util.py""" | |
hash_sha256 = hashlib.sha256() | |
blksize = 1024 * 1024 | |
b.seek(0) | |
header = b.read(8) | |
n = int.from_bytes(header, "little") | |
offset = n + 8 | |
b.seek(offset) | |
for chunk in iter(lambda: b.read(blksize), b""): | |
hash_sha256.update(chunk) | |
return hash_sha256.hexdigest() | |