Spaces:
Running
Running
import io | |
import posixpath | |
import zipfile | |
import itertools | |
import contextlib | |
import sys | |
import pathlib | |
if sys.version_info < (3, 7): | |
from collections import OrderedDict | |
else: | |
OrderedDict = dict | |
__all__ = ['Path'] | |
def _parents(path): | |
""" | |
Given a path with elements separated by | |
posixpath.sep, generate all parents of that path. | |
>>> list(_parents('b/d')) | |
['b'] | |
>>> list(_parents('/b/d/')) | |
['/b'] | |
>>> list(_parents('b/d/f/')) | |
['b/d', 'b'] | |
>>> list(_parents('b')) | |
[] | |
>>> list(_parents('')) | |
[] | |
""" | |
return itertools.islice(_ancestry(path), 1, None) | |
def _ancestry(path): | |
""" | |
Given a path with elements separated by | |
posixpath.sep, generate all elements of that path | |
>>> list(_ancestry('b/d')) | |
['b/d', 'b'] | |
>>> list(_ancestry('/b/d/')) | |
['/b/d', '/b'] | |
>>> list(_ancestry('b/d/f/')) | |
['b/d/f', 'b/d', 'b'] | |
>>> list(_ancestry('b')) | |
['b'] | |
>>> list(_ancestry('')) | |
[] | |
""" | |
path = path.rstrip(posixpath.sep) | |
while path and path != posixpath.sep: | |
yield path | |
path, tail = posixpath.split(path) | |
_dedupe = OrderedDict.fromkeys | |
"""Deduplicate an iterable in original order""" | |
def _difference(minuend, subtrahend): | |
""" | |
Return items in minuend not in subtrahend, retaining order | |
with O(1) lookup. | |
""" | |
return itertools.filterfalse(set(subtrahend).__contains__, minuend) | |
class CompleteDirs(zipfile.ZipFile): | |
""" | |
A ZipFile subclass that ensures that implied directories | |
are always included in the namelist. | |
""" | |
def _implied_dirs(names): | |
parents = itertools.chain.from_iterable(map(_parents, names)) | |
as_dirs = (p + posixpath.sep for p in parents) | |
return _dedupe(_difference(as_dirs, names)) | |
def namelist(self): | |
names = super(CompleteDirs, self).namelist() | |
return names + list(self._implied_dirs(names)) | |
def _name_set(self): | |
return set(self.namelist()) | |
def resolve_dir(self, name): | |
""" | |
If the name represents a directory, return that name | |
as a directory (with the trailing slash). | |
""" | |
names = self._name_set() | |
dirname = name + '/' | |
dir_match = name not in names and dirname in names | |
return dirname if dir_match else name | |
def make(cls, source): | |
""" | |
Given a source (filename or zipfile), return an | |
appropriate CompleteDirs subclass. | |
""" | |
if isinstance(source, CompleteDirs): | |
return source | |
if not isinstance(source, zipfile.ZipFile): | |
return cls(_pathlib_compat(source)) | |
# Only allow for FastLookup when supplied zipfile is read-only | |
if 'r' not in source.mode: | |
cls = CompleteDirs | |
source.__class__ = cls | |
return source | |
class FastLookup(CompleteDirs): | |
""" | |
ZipFile subclass to ensure implicit | |
dirs exist and are resolved rapidly. | |
""" | |
def namelist(self): | |
with contextlib.suppress(AttributeError): | |
return self.__names | |
self.__names = super(FastLookup, self).namelist() | |
return self.__names | |
def _name_set(self): | |
with contextlib.suppress(AttributeError): | |
return self.__lookup | |
self.__lookup = super(FastLookup, self)._name_set() | |
return self.__lookup | |
def _pathlib_compat(path): | |
""" | |
For path-like objects, convert to a filename for compatibility | |
on Python 3.6.1 and earlier. | |
""" | |
try: | |
return path.__fspath__() | |
except AttributeError: | |
return str(path) | |
class Path: | |
""" | |
A pathlib-compatible interface for zip files. | |
Consider a zip file with this structure:: | |
. | |
βββ a.txt | |
βββ b | |
βββ c.txt | |
βββ d | |
βββ e.txt | |
>>> data = io.BytesIO() | |
>>> zf = zipfile.ZipFile(data, 'w') | |
>>> zf.writestr('a.txt', 'content of a') | |
>>> zf.writestr('b/c.txt', 'content of c') | |
>>> zf.writestr('b/d/e.txt', 'content of e') | |
>>> zf.filename = 'mem/abcde.zip' | |
Path accepts the zipfile object itself or a filename | |
>>> root = Path(zf) | |
From there, several path operations are available. | |
Directory iteration (including the zip file itself): | |
>>> a, b = root.iterdir() | |
>>> a | |
Path('mem/abcde.zip', 'a.txt') | |
>>> b | |
Path('mem/abcde.zip', 'b/') | |
name property: | |
>>> b.name | |
'b' | |
join with divide operator: | |
>>> c = b / 'c.txt' | |
>>> c | |
Path('mem/abcde.zip', 'b/c.txt') | |
>>> c.name | |
'c.txt' | |
Read text: | |
>>> c.read_text() | |
'content of c' | |
existence: | |
>>> c.exists() | |
True | |
>>> (b / 'missing.txt').exists() | |
False | |
Coercion to string: | |
>>> import os | |
>>> str(c).replace(os.sep, posixpath.sep) | |
'mem/abcde.zip/b/c.txt' | |
At the root, ``name``, ``filename``, and ``parent`` | |
resolve to the zipfile. Note these attributes are not | |
valid and will raise a ``ValueError`` if the zipfile | |
has no filename. | |
>>> root.name | |
'abcde.zip' | |
>>> str(root.filename).replace(os.sep, posixpath.sep) | |
'mem/abcde.zip' | |
>>> str(root.parent) | |
'mem' | |
""" | |
__repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})" | |
def __init__(self, root, at=""): | |
""" | |
Construct a Path from a ZipFile or filename. | |
Note: When the source is an existing ZipFile object, | |
its type (__class__) will be mutated to a | |
specialized type. If the caller wishes to retain the | |
original type, the caller should either create a | |
separate ZipFile object or pass a filename. | |
""" | |
self.root = FastLookup.make(root) | |
self.at = at | |
def open(self, mode='r', *args, pwd=None, **kwargs): | |
""" | |
Open this entry as text or binary following the semantics | |
of ``pathlib.Path.open()`` by passing arguments through | |
to io.TextIOWrapper(). | |
""" | |
if self.is_dir(): | |
raise IsADirectoryError(self) | |
zip_mode = mode[0] | |
if not self.exists() and zip_mode == 'r': | |
raise FileNotFoundError(self) | |
stream = self.root.open(self.at, zip_mode, pwd=pwd) | |
if 'b' in mode: | |
if args or kwargs: | |
raise ValueError("encoding args invalid for binary operation") | |
return stream | |
return io.TextIOWrapper(stream, *args, **kwargs) | |
def name(self): | |
return pathlib.Path(self.at).name or self.filename.name | |
def suffix(self): | |
return pathlib.Path(self.at).suffix or self.filename.suffix | |
def suffixes(self): | |
return pathlib.Path(self.at).suffixes or self.filename.suffixes | |
def stem(self): | |
return pathlib.Path(self.at).stem or self.filename.stem | |
def filename(self): | |
return pathlib.Path(self.root.filename).joinpath(self.at) | |
def read_text(self, *args, **kwargs): | |
with self.open('r', *args, **kwargs) as strm: | |
return strm.read() | |
def read_bytes(self): | |
with self.open('rb') as strm: | |
return strm.read() | |
def _is_child(self, path): | |
return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/") | |
def _next(self, at): | |
return self.__class__(self.root, at) | |
def is_dir(self): | |
return not self.at or self.at.endswith("/") | |
def is_file(self): | |
return self.exists() and not self.is_dir() | |
def exists(self): | |
return self.at in self.root._name_set() | |
def iterdir(self): | |
if not self.is_dir(): | |
raise ValueError("Can't listdir a file") | |
subs = map(self._next, self.root.namelist()) | |
return filter(self._is_child, subs) | |
def __str__(self): | |
return posixpath.join(self.root.filename, self.at) | |
def __repr__(self): | |
return self.__repr.format(self=self) | |
def joinpath(self, *other): | |
next = posixpath.join(self.at, *map(_pathlib_compat, other)) | |
return self._next(self.root.resolve_dir(next)) | |
__truediv__ = joinpath | |
def parent(self): | |
if not self.at: | |
return self.filename.parent | |
parent_at = posixpath.dirname(self.at.rstrip('/')) | |
if parent_at: | |
parent_at += '/' | |
return self._next(parent_at) | |