Spaces:
Runtime error
Runtime error
File size: 11,882 Bytes
c19ca42 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
import itertools
import os
from collections import UserDict
from dataclasses import dataclass, field
from typing import Callable, Dict, Iterator, List, Optional, Union
from installer import log
class Directory: # forward declaration
...
FilePathList = List[str]
FilePathIterator = Iterator[str]
DirectoryPathList = List[str]
DirectoryPathIterator = Iterator[str]
DirectoryList = List[Directory]
DirectoryIterator = Iterator[Directory]
DirectoryCollection = Dict[str, Directory]
ExtensionFilter = Callable
ExtensionList = list[str]
RecursiveType = Union[bool,Callable]
def real_path(directory_path:str) -> Union[str, None]:
try:
return os.path.abspath(os.path.expanduser(directory_path))
except Exception:
pass
return None
@dataclass(frozen=True)
class Directory(Directory): # pylint: disable=E0102
path: str = field(default_factory=str)
mtime: float = field(default_factory=float, init=False)
files: FilePathList = field(default_factory=list)
directories: DirectoryPathList = field(default_factory=list)
def __post_init__(self):
object.__setattr__(self, 'mtime', self.live_mtime)
@classmethod
def from_dict(cls, dict_object: dict) -> Directory:
directory = cls.__new__(cls)
object.__setattr__(directory, 'path', dict_object.get('path'))
object.__setattr__(directory, 'mtime', dict_object.get('mtime'))
object.__setattr__(directory, 'files', dict_object.get('files'))
object.__setattr__(directory, 'directories', dict_object.get('directories'))
return directory
def clear(self) -> None:
self._update(Directory.from_dict({
'path': None,
'mtime': float(),
'files': [],
'directories': []
}))
def update(self, source_directory: Directory) -> Directory:
if source_directory is not self:
self._update(source_directory)
return self
def _update(self, source:Directory) -> None:
assert not source.path or source.path == self.path, f'When updating a directory, the paths must match. Attemped to update Directory `{self.path}` with `{source.path}`'
for dead_path in self.directories:
if dead_path not in source.directories:
delete_cached_directory(dead_path)
self.directories[:] = source.directories
self.files[:] = source.files
object.__setattr__(self, 'mtime', source.mtime)
@property
def exists(self) -> bool:
return self.path and os.path.exists(self.path)
@property
def is_directory(self) -> bool:
return self.exists and os.path.isdir(self.path)
@property
def live_mtime(self) -> float:
return os.path.getmtime(self.path) if self.is_directory else 0
@property
def is_stale(self) -> bool:
return not self.is_directory or self.mtime != self.live_mtime
class DirectoryCache(UserDict, DirectoryCollection):
def __delattr__(self, directory_path: str) -> None:
directory: Directory = get_directory(directory_path, fetch=False)
if directory:
map(delete_cached_directory, directory.directories)
directory.clear()
del self.data[directory_path]
def clean_directory(directory: Directory, /, recursive: RecursiveType=False) -> bool:
if not directory.is_directory:
is_clean = False
delete_cached_directory(directory.path)
else:
is_clean = not directory.is_stale
if not is_clean:
directory.update(fetch_directory(directory.path))
else:
for directory_path in directory.directories[:]:
try:
recurse = recursive and (not callable(recursive) or recursive(directory.path))
directory = get_directory(directory_path, fetch=recurse)
if directory:
if directory.is_directory:
if recurse:
is_clean = clean_directory(directory, recursive=recurse) and is_clean
continue
delete_cached_directory(directory_path)
# If we had intended to fetch this directory, but didn't, that means it doesn't exist. Purge.
if recurse:
directory.directories.remove(directory_path)
is_clean = False
except Exception:
pass
return is_clean
def get_directory(directory_or_path: str, /, fetch:bool=True) -> Union[Directory, None]:
if isinstance(directory_or_path, Directory):
if directory_or_path.is_directory:
return directory_or_path
else:
directory_or_path = directory_or_path.path
directory_or_path = real_path(directory_or_path)
if not cache_folders.get(directory_or_path, None):
if fetch:
directory = fetch_directory(directory_path=directory_or_path)
if directory:
cache_folders[directory_or_path] = directory
else:
clean_directory(cache_folders[directory_or_path])
return cache_folders[directory_or_path] if directory_or_path in cache_folders else None
def fetch_directory(directory_path: str) -> Union[Directory, None]:
directory: Directory
for directory in _walk(directory_path, recurse=False):
return directory # The return is intentional, we get a generator, we only need the one
return None
def _walk(top, recurse:RecursiveType=True) -> Directory:
# reimplemented `path.walk()`
nondirs = []
walk_dirs = []
try:
scandir_it = os.scandir(top)
except OSError:
return
with scandir_it:
while True:
try:
entry = next(scandir_it)
except StopIteration:
break
if not entry.is_dir():
nondirs.append(entry.path)
else:
if entry.is_symlink() and not os.path.exists(entry.path):
log.error(f'Files broken symlink: {entry.path}')
else:
walk_dirs.append(entry.path)
yield Directory(top, nondirs, walk_dirs)
if recurse:
for new_path in walk_dirs:
if callable(recurse) and not recurse(new_path):
continue
yield from _walk(new_path, recurse=recurse)
def _cached_walk(top, recurse:RecursiveType=True) -> Directory:
top = get_directory(top)
if not top:
return
yield top
if recurse:
for child_directory in top.directories:
if os.path.basename(child_directory).startswith('models--'):
continue
if callable(recurse) and not recurse(child_directory):
continue
yield from _cached_walk(child_directory, recurse=recurse)
def walk(top, recurse:RecursiveType=True, cached=True) -> Directory:
yield from _cached_walk(top, recurse=recurse) if cached else _walk(top, recurse=recurse)
def delete_cached_directory(directory_path:str) -> bool:
global cache_folders # pylint: disable=W0602
if directory_path in cache_folders:
del cache_folders[directory_path]
def is_directory(dir_path:str) -> bool:
return dir_path and os.path.exists(dir_path) and os.path.isdir(dir_path)
def directory_mtime(directory_path:str, /, recursive:RecursiveType=True) -> float:
return float(max(0, *[directory.mtime for directory in get_directories(directory_path, recursive=recursive)]))
def unique_directories(directories:DirectoryPathList, /, recursive:RecursiveType=True) -> DirectoryPathIterator:
'''Ensure no empty, or duplicates'''
'''If we are going recursive, then directories that are children of other directories are redundant'''
''' @todo this is incredibly inneficient. the hit is small, but it is ugly, no? '''
directories = sorted(unique_paths(directories), reverse=True)
while directories:
directory = directories.pop()
yield directory
if not recursive:
continue
_directory = os.path.join(directory, '')
child_directory = None
while directories and directories[-1].startswith(_directory):
if not callable(recursive) or not child_directory:
directories.pop()
continue
child_directory = directories[-1][len(directory):]
if child_directory:
next_directory = _directory
if not callable(recursive):
_remove_directory = next_directory
else:
for sub_directory in child_directory.split(os.path.sep):
next_directory = os.path.join(next_directory, sub_directory)
if recursive(next_directory):
_remove_directory = os.path.join(next_directory, '')
break
while _remove_directory and directories:
_d = directories.pop()
if not directories[-1].startswith(_remove_directory):
del _remove_directory
def unique_paths(directory_paths:DirectoryPathList) -> DirectoryPathIterator:
realpaths = (real_path(directory_path) for directory_path in filter(bool, directory_paths))
return {real_directory_path: True for real_directory_path in filter(bool, realpaths)}.keys()
def get_directories(*directory_paths: DirectoryPathList, fetch:bool=True, recursive:RecursiveType=True) -> DirectoryCollection:
directory_paths = unique_directories(directory_paths, recursive=recursive)
directories = (get_directory(directory_path, fetch=fetch) for directory_path in directory_paths)
return filter(bool, directories)
def directory_files(*directories_or_paths: Union[DirectoryPathList, DirectoryList], recursive: RecursiveType=True) -> FilePathIterator:
return itertools.chain.from_iterable(
itertools.chain(
directory_object.files,
[]
if not recursive
else itertools.chain.from_iterable(
directory_files(directory, recursive=recursive)
for directory
in filter(
bool,
map(get_directory, filter(((bool if recursive else False) if not callable(recursive) else recursive), directory_object.directories))
)
)
)
for directory_object
in filter(bool, map(get_directory, directories_or_paths))
)
def extension_filter(ext_filter: Optional[ExtensionList]=None, ext_blacklist: Optional[ExtensionList]=None) -> ExtensionFilter:
if ext_filter:
ext_filter = [*map(str.upper, ext_filter)]
if ext_blacklist:
ext_blacklist = [*map(str.upper, ext_blacklist)]
def filter_functon(fp:str):
return (not ext_filter or any(fp.upper().endswith(ew) for ew in ext_filter)) and (not ext_blacklist or not any(fp.upper().endswith(ew) for ew in ext_blacklist))
return filter_functon
def not_hidden(filepath: str) -> bool:
return not os.path.basename(filepath).startswith('.')
def filter_files(file_paths: FilePathList, ext_filter: Optional[ExtensionList]=None, ext_blacklist: Optional[ExtensionList]=None) -> FilePathIterator:
return filter(extension_filter(ext_filter, ext_blacklist), file_paths)
def list_files(*directory_paths:DirectoryPathList, ext_filter: Optional[ExtensionList]=None, ext_blacklist: Optional[ExtensionList]=None, recursive:RecursiveType=True) -> FilePathIterator:
return filter_files(itertools.chain.from_iterable(
directory_files(directory, recursive=recursive)
for directory in get_directories(*directory_paths, recursive=recursive)
), ext_filter, ext_blacklist)
cache_folders = DirectoryCache({})
|