Spaces:
Runtime error
Runtime error
import itertools | |
import os | |
from collections import UserDict | |
from dataclasses import dataclass, field | |
from typing import Callable, Dict, Iterator, List, Optional, Union | |
from installer import log | |
class Directory: # forward declaration | |
... | |
FilePathList = List[str] | |
FilePathIterator = Iterator[str] | |
DirectoryPathList = List[str] | |
DirectoryPathIterator = Iterator[str] | |
DirectoryList = List[Directory] | |
DirectoryIterator = Iterator[Directory] | |
DirectoryCollection = Dict[str, Directory] | |
ExtensionFilter = Callable | |
ExtensionList = list[str] | |
RecursiveType = Union[bool,Callable] | |
def real_path(directory_path:str) -> Union[str, None]: | |
try: | |
return os.path.abspath(os.path.expanduser(directory_path)) | |
except Exception: | |
pass | |
return None | |
class Directory(Directory): # pylint: disable=E0102 | |
path: str = field(default_factory=str) | |
mtime: float = field(default_factory=float, init=False) | |
files: FilePathList = field(default_factory=list) | |
directories: DirectoryPathList = field(default_factory=list) | |
def __post_init__(self): | |
object.__setattr__(self, 'mtime', self.live_mtime) | |
def from_dict(cls, dict_object: dict) -> Directory: | |
directory = cls.__new__(cls) | |
object.__setattr__(directory, 'path', dict_object.get('path')) | |
object.__setattr__(directory, 'mtime', dict_object.get('mtime')) | |
object.__setattr__(directory, 'files', dict_object.get('files')) | |
object.__setattr__(directory, 'directories', dict_object.get('directories')) | |
return directory | |
def clear(self) -> None: | |
self._update(Directory.from_dict({ | |
'path': None, | |
'mtime': float(), | |
'files': [], | |
'directories': [] | |
})) | |
def update(self, source_directory: Directory) -> Directory: | |
if source_directory is not self: | |
self._update(source_directory) | |
return self | |
def _update(self, source:Directory) -> None: | |
assert not source.path or source.path == self.path, f'When updating a directory, the paths must match. Attemped to update Directory `{self.path}` with `{source.path}`' | |
for dead_path in self.directories: | |
if dead_path not in source.directories: | |
delete_cached_directory(dead_path) | |
self.directories[:] = source.directories | |
self.files[:] = source.files | |
object.__setattr__(self, 'mtime', source.mtime) | |
def exists(self) -> bool: | |
return self.path and os.path.exists(self.path) | |
def is_directory(self) -> bool: | |
return self.exists and os.path.isdir(self.path) | |
def live_mtime(self) -> float: | |
return os.path.getmtime(self.path) if self.is_directory else 0 | |
def is_stale(self) -> bool: | |
return not self.is_directory or self.mtime != self.live_mtime | |
class DirectoryCache(UserDict, DirectoryCollection): | |
def __delattr__(self, directory_path: str) -> None: | |
directory: Directory = get_directory(directory_path, fetch=False) | |
if directory: | |
map(delete_cached_directory, directory.directories) | |
directory.clear() | |
del self.data[directory_path] | |
def clean_directory(directory: Directory, /, recursive: RecursiveType=False) -> bool: | |
if not directory.is_directory: | |
is_clean = False | |
delete_cached_directory(directory.path) | |
else: | |
is_clean = not directory.is_stale | |
if not is_clean: | |
directory.update(fetch_directory(directory.path)) | |
else: | |
for directory_path in directory.directories[:]: | |
try: | |
recurse = recursive and (not callable(recursive) or recursive(directory.path)) | |
directory = get_directory(directory_path, fetch=recurse) | |
if directory: | |
if directory.is_directory: | |
if recurse: | |
is_clean = clean_directory(directory, recursive=recurse) and is_clean | |
continue | |
delete_cached_directory(directory_path) | |
# If we had intended to fetch this directory, but didn't, that means it doesn't exist. Purge. | |
if recurse: | |
directory.directories.remove(directory_path) | |
is_clean = False | |
except Exception: | |
pass | |
return is_clean | |
def get_directory(directory_or_path: str, /, fetch:bool=True) -> Union[Directory, None]: | |
if isinstance(directory_or_path, Directory): | |
if directory_or_path.is_directory: | |
return directory_or_path | |
else: | |
directory_or_path = directory_or_path.path | |
directory_or_path = real_path(directory_or_path) | |
if not cache_folders.get(directory_or_path, None): | |
if fetch: | |
directory = fetch_directory(directory_path=directory_or_path) | |
if directory: | |
cache_folders[directory_or_path] = directory | |
else: | |
clean_directory(cache_folders[directory_or_path]) | |
return cache_folders[directory_or_path] if directory_or_path in cache_folders else None | |
def fetch_directory(directory_path: str) -> Union[Directory, None]: | |
directory: Directory | |
for directory in _walk(directory_path, recurse=False): | |
return directory # The return is intentional, we get a generator, we only need the one | |
return None | |
def _walk(top, recurse:RecursiveType=True) -> Directory: | |
# reimplemented `path.walk()` | |
nondirs = [] | |
walk_dirs = [] | |
try: | |
scandir_it = os.scandir(top) | |
except OSError: | |
return | |
with scandir_it: | |
while True: | |
try: | |
entry = next(scandir_it) | |
except StopIteration: | |
break | |
if not entry.is_dir(): | |
nondirs.append(entry.path) | |
else: | |
if entry.is_symlink() and not os.path.exists(entry.path): | |
log.error(f'Files broken symlink: {entry.path}') | |
else: | |
walk_dirs.append(entry.path) | |
yield Directory(top, nondirs, walk_dirs) | |
if recurse: | |
for new_path in walk_dirs: | |
if callable(recurse) and not recurse(new_path): | |
continue | |
yield from _walk(new_path, recurse=recurse) | |
def _cached_walk(top, recurse:RecursiveType=True) -> Directory: | |
top = get_directory(top) | |
if not top: | |
return | |
yield top | |
if recurse: | |
for child_directory in top.directories: | |
if os.path.basename(child_directory).startswith('models--'): | |
continue | |
if callable(recurse) and not recurse(child_directory): | |
continue | |
yield from _cached_walk(child_directory, recurse=recurse) | |
def walk(top, recurse:RecursiveType=True, cached=True) -> Directory: | |
yield from _cached_walk(top, recurse=recurse) if cached else _walk(top, recurse=recurse) | |
def delete_cached_directory(directory_path:str) -> bool: | |
global cache_folders # pylint: disable=W0602 | |
if directory_path in cache_folders: | |
del cache_folders[directory_path] | |
def is_directory(dir_path:str) -> bool: | |
return dir_path and os.path.exists(dir_path) and os.path.isdir(dir_path) | |
def directory_mtime(directory_path:str, /, recursive:RecursiveType=True) -> float: | |
return float(max(0, *[directory.mtime for directory in get_directories(directory_path, recursive=recursive)])) | |
def unique_directories(directories:DirectoryPathList, /, recursive:RecursiveType=True) -> DirectoryPathIterator: | |
'''Ensure no empty, or duplicates''' | |
'''If we are going recursive, then directories that are children of other directories are redundant''' | |
''' @todo this is incredibly inneficient. the hit is small, but it is ugly, no? ''' | |
directories = sorted(unique_paths(directories), reverse=True) | |
while directories: | |
directory = directories.pop() | |
yield directory | |
if not recursive: | |
continue | |
_directory = os.path.join(directory, '') | |
child_directory = None | |
while directories and directories[-1].startswith(_directory): | |
if not callable(recursive) or not child_directory: | |
directories.pop() | |
continue | |
child_directory = directories[-1][len(directory):] | |
if child_directory: | |
next_directory = _directory | |
if not callable(recursive): | |
_remove_directory = next_directory | |
else: | |
for sub_directory in child_directory.split(os.path.sep): | |
next_directory = os.path.join(next_directory, sub_directory) | |
if recursive(next_directory): | |
_remove_directory = os.path.join(next_directory, '') | |
break | |
while _remove_directory and directories: | |
_d = directories.pop() | |
if not directories[-1].startswith(_remove_directory): | |
del _remove_directory | |
def unique_paths(directory_paths:DirectoryPathList) -> DirectoryPathIterator: | |
realpaths = (real_path(directory_path) for directory_path in filter(bool, directory_paths)) | |
return {real_directory_path: True for real_directory_path in filter(bool, realpaths)}.keys() | |
def get_directories(*directory_paths: DirectoryPathList, fetch:bool=True, recursive:RecursiveType=True) -> DirectoryCollection: | |
directory_paths = unique_directories(directory_paths, recursive=recursive) | |
directories = (get_directory(directory_path, fetch=fetch) for directory_path in directory_paths) | |
return filter(bool, directories) | |
def directory_files(*directories_or_paths: Union[DirectoryPathList, DirectoryList], recursive: RecursiveType=True) -> FilePathIterator: | |
return itertools.chain.from_iterable( | |
itertools.chain( | |
directory_object.files, | |
[] | |
if not recursive | |
else itertools.chain.from_iterable( | |
directory_files(directory, recursive=recursive) | |
for directory | |
in filter( | |
bool, | |
map(get_directory, filter(((bool if recursive else False) if not callable(recursive) else recursive), directory_object.directories)) | |
) | |
) | |
) | |
for directory_object | |
in filter(bool, map(get_directory, directories_or_paths)) | |
) | |
def extension_filter(ext_filter: Optional[ExtensionList]=None, ext_blacklist: Optional[ExtensionList]=None) -> ExtensionFilter: | |
if ext_filter: | |
ext_filter = [*map(str.upper, ext_filter)] | |
if ext_blacklist: | |
ext_blacklist = [*map(str.upper, ext_blacklist)] | |
def filter_functon(fp:str): | |
return (not ext_filter or any(fp.upper().endswith(ew) for ew in ext_filter)) and (not ext_blacklist or not any(fp.upper().endswith(ew) for ew in ext_blacklist)) | |
return filter_functon | |
def not_hidden(filepath: str) -> bool: | |
return not os.path.basename(filepath).startswith('.') | |
def filter_files(file_paths: FilePathList, ext_filter: Optional[ExtensionList]=None, ext_blacklist: Optional[ExtensionList]=None) -> FilePathIterator: | |
return filter(extension_filter(ext_filter, ext_blacklist), file_paths) | |
def list_files(*directory_paths:DirectoryPathList, ext_filter: Optional[ExtensionList]=None, ext_blacklist: Optional[ExtensionList]=None, recursive:RecursiveType=True) -> FilePathIterator: | |
return filter_files(itertools.chain.from_iterable( | |
directory_files(directory, recursive=recursive) | |
for directory in get_directories(*directory_paths, recursive=recursive) | |
), ext_filter, ext_blacklist) | |
cache_folders = DirectoryCache({}) | |