|
import email.message |
|
import email.parser |
|
import logging |
|
import os |
|
import zipfile |
|
from typing import Collection, Iterable, Iterator, List, Mapping, NamedTuple, Optional |
|
|
|
from pip._vendor import pkg_resources |
|
from pip._vendor.packaging.requirements import Requirement |
|
from pip._vendor.packaging.utils import NormalizedName, canonicalize_name |
|
from pip._vendor.packaging.version import parse as parse_version |
|
|
|
from pip._internal.exceptions import InvalidWheel, NoneMetadataError, UnsupportedWheel |
|
from pip._internal.utils.egg_link import egg_link_path_from_location |
|
from pip._internal.utils.misc import display_path, normalize_path |
|
from pip._internal.utils.wheel import parse_wheel, read_wheel_metadata_file |
|
|
|
from .base import ( |
|
BaseDistribution, |
|
BaseEntryPoint, |
|
BaseEnvironment, |
|
DistributionVersion, |
|
InfoPath, |
|
Wheel, |
|
) |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class EntryPoint(NamedTuple): |
|
name: str |
|
value: str |
|
group: str |
|
|
|
|
|
class InMemoryMetadata: |
|
"""IMetadataProvider that reads metadata files from a dictionary. |
|
|
|
This also maps metadata decoding exceptions to our internal exception type. |
|
""" |
|
|
|
def __init__(self, metadata: Mapping[str, bytes], wheel_name: str) -> None: |
|
self._metadata = metadata |
|
self._wheel_name = wheel_name |
|
|
|
def has_metadata(self, name: str) -> bool: |
|
return name in self._metadata |
|
|
|
def get_metadata(self, name: str) -> str: |
|
try: |
|
return self._metadata[name].decode() |
|
except UnicodeDecodeError as e: |
|
|
|
raise UnsupportedWheel( |
|
f"Error decoding metadata for {self._wheel_name}: {e} in {name} file" |
|
) |
|
|
|
def get_metadata_lines(self, name: str) -> Iterable[str]: |
|
return pkg_resources.yield_lines(self.get_metadata(name)) |
|
|
|
def metadata_isdir(self, name: str) -> bool: |
|
return False |
|
|
|
def metadata_listdir(self, name: str) -> List[str]: |
|
return [] |
|
|
|
def run_script(self, script_name: str, namespace: str) -> None: |
|
pass |
|
|
|
|
|
class Distribution(BaseDistribution): |
|
def __init__(self, dist: pkg_resources.Distribution) -> None: |
|
self._dist = dist |
|
|
|
@classmethod |
|
def from_directory(cls, directory: str) -> BaseDistribution: |
|
dist_dir = directory.rstrip(os.sep) |
|
|
|
|
|
base_dir, dist_dir_name = os.path.split(dist_dir) |
|
metadata = pkg_resources.PathMetadata(base_dir, dist_dir) |
|
|
|
|
|
if dist_dir.endswith(".egg-info"): |
|
dist_cls = pkg_resources.Distribution |
|
dist_name = os.path.splitext(dist_dir_name)[0] |
|
else: |
|
assert dist_dir.endswith(".dist-info") |
|
dist_cls = pkg_resources.DistInfoDistribution |
|
dist_name = os.path.splitext(dist_dir_name)[0].split("-")[0] |
|
|
|
dist = dist_cls(base_dir, project_name=dist_name, metadata=metadata) |
|
return cls(dist) |
|
|
|
@classmethod |
|
def from_metadata_file_contents( |
|
cls, |
|
metadata_contents: bytes, |
|
filename: str, |
|
project_name: str, |
|
) -> BaseDistribution: |
|
metadata_dict = { |
|
"METADATA": metadata_contents, |
|
} |
|
dist = pkg_resources.DistInfoDistribution( |
|
location=filename, |
|
metadata=InMemoryMetadata(metadata_dict, filename), |
|
project_name=project_name, |
|
) |
|
return cls(dist) |
|
|
|
@classmethod |
|
def from_wheel(cls, wheel: Wheel, name: str) -> BaseDistribution: |
|
try: |
|
with wheel.as_zipfile() as zf: |
|
info_dir, _ = parse_wheel(zf, name) |
|
metadata_dict = { |
|
path.split("/", 1)[-1]: read_wheel_metadata_file(zf, path) |
|
for path in zf.namelist() |
|
if path.startswith(f"{info_dir}/") |
|
} |
|
except zipfile.BadZipFile as e: |
|
raise InvalidWheel(wheel.location, name) from e |
|
except UnsupportedWheel as e: |
|
raise UnsupportedWheel(f"{name} has an invalid wheel, {e}") |
|
dist = pkg_resources.DistInfoDistribution( |
|
location=wheel.location, |
|
metadata=InMemoryMetadata(metadata_dict, wheel.location), |
|
project_name=name, |
|
) |
|
return cls(dist) |
|
|
|
@property |
|
def location(self) -> Optional[str]: |
|
return self._dist.location |
|
|
|
@property |
|
def installed_location(self) -> Optional[str]: |
|
egg_link = egg_link_path_from_location(self.raw_name) |
|
if egg_link: |
|
location = egg_link |
|
elif self.location: |
|
location = self.location |
|
else: |
|
return None |
|
return normalize_path(location) |
|
|
|
@property |
|
def info_location(self) -> Optional[str]: |
|
return self._dist.egg_info |
|
|
|
@property |
|
def installed_by_distutils(self) -> bool: |
|
|
|
|
|
|
|
try: |
|
return bool(self._dist._provider.path) |
|
except AttributeError: |
|
return False |
|
|
|
@property |
|
def canonical_name(self) -> NormalizedName: |
|
return canonicalize_name(self._dist.project_name) |
|
|
|
@property |
|
def version(self) -> DistributionVersion: |
|
return parse_version(self._dist.version) |
|
|
|
def is_file(self, path: InfoPath) -> bool: |
|
return self._dist.has_metadata(str(path)) |
|
|
|
def iter_distutils_script_names(self) -> Iterator[str]: |
|
yield from self._dist.metadata_listdir("scripts") |
|
|
|
def read_text(self, path: InfoPath) -> str: |
|
name = str(path) |
|
if not self._dist.has_metadata(name): |
|
raise FileNotFoundError(name) |
|
content = self._dist.get_metadata(name) |
|
if content is None: |
|
raise NoneMetadataError(self, name) |
|
return content |
|
|
|
def iter_entry_points(self) -> Iterable[BaseEntryPoint]: |
|
for group, entries in self._dist.get_entry_map().items(): |
|
for name, entry_point in entries.items(): |
|
name, _, value = str(entry_point).partition("=") |
|
yield EntryPoint(name=name.strip(), value=value.strip(), group=group) |
|
|
|
def _metadata_impl(self) -> email.message.Message: |
|
""" |
|
:raises NoneMetadataError: if the distribution reports `has_metadata()` |
|
True but `get_metadata()` returns None. |
|
""" |
|
if isinstance(self._dist, pkg_resources.DistInfoDistribution): |
|
metadata_name = "METADATA" |
|
else: |
|
metadata_name = "PKG-INFO" |
|
try: |
|
metadata = self.read_text(metadata_name) |
|
except FileNotFoundError: |
|
if self.location: |
|
displaying_path = display_path(self.location) |
|
else: |
|
displaying_path = repr(self.location) |
|
logger.warning("No metadata found in %s", displaying_path) |
|
metadata = "" |
|
feed_parser = email.parser.FeedParser() |
|
feed_parser.feed(metadata) |
|
return feed_parser.close() |
|
|
|
def iter_dependencies(self, extras: Collection[str] = ()) -> Iterable[Requirement]: |
|
if extras: |
|
extras = frozenset(extras).intersection(self._dist.extras) |
|
return self._dist.requires(extras) |
|
|
|
def iter_provided_extras(self) -> Iterable[str]: |
|
return self._dist.extras |
|
|
|
|
|
class Environment(BaseEnvironment): |
|
def __init__(self, ws: pkg_resources.WorkingSet) -> None: |
|
self._ws = ws |
|
|
|
@classmethod |
|
def default(cls) -> BaseEnvironment: |
|
return cls(pkg_resources.working_set) |
|
|
|
@classmethod |
|
def from_paths(cls, paths: Optional[List[str]]) -> BaseEnvironment: |
|
return cls(pkg_resources.WorkingSet(paths)) |
|
|
|
def _iter_distributions(self) -> Iterator[BaseDistribution]: |
|
for dist in self._ws: |
|
yield Distribution(dist) |
|
|
|
def _search_distribution(self, name: str) -> Optional[BaseDistribution]: |
|
"""Find a distribution matching the ``name`` in the environment. |
|
|
|
This searches from *all* distributions available in the environment, to |
|
match the behavior of ``pkg_resources.get_distribution()``. |
|
""" |
|
canonical_name = canonicalize_name(name) |
|
for dist in self.iter_all_distributions(): |
|
if dist.canonical_name == canonical_name: |
|
return dist |
|
return None |
|
|
|
def get_distribution(self, name: str) -> Optional[BaseDistribution]: |
|
|
|
dist = self._search_distribution(name) |
|
if dist: |
|
return dist |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
self._ws.require(name) |
|
except pkg_resources.DistributionNotFound: |
|
return None |
|
return self._search_distribution(name) |
|
|