File size: 9,773 Bytes
3362754
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
import email.message
import email.parser
import logging
import os
import zipfile
from typing import Collection, Iterable, Iterator, List, Mapping, NamedTuple, Optional

from pip._vendor import pkg_resources
from pip._vendor.packaging.requirements import Requirement
from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
from pip._vendor.packaging.version import parse as parse_version

from pip._internal.exceptions import InvalidWheel, NoneMetadataError, UnsupportedWheel
from pip._internal.utils.egg_link import egg_link_path_from_location
from pip._internal.utils.misc import display_path, normalize_path
from pip._internal.utils.wheel import parse_wheel, read_wheel_metadata_file

from .base import (
    BaseDistribution,
    BaseEntryPoint,
    BaseEnvironment,
    DistributionVersion,
    InfoPath,
    Wheel,
)

logger = logging.getLogger(__name__)


class EntryPoint(NamedTuple):
    name: str
    value: str
    group: str


class InMemoryMetadata:
    """IMetadataProvider that reads metadata files from a dictionary.

    This also maps metadata decoding exceptions to our internal exception type.
    """

    def __init__(self, metadata: Mapping[str, bytes], wheel_name: str) -> None:
        self._metadata = metadata
        self._wheel_name = wheel_name

    def has_metadata(self, name: str) -> bool:
        return name in self._metadata

    def get_metadata(self, name: str) -> str:
        try:
            return self._metadata[name].decode()
        except UnicodeDecodeError as e:
            # Augment the default error with the origin of the file.
            raise UnsupportedWheel(
                f"Error decoding metadata for {self._wheel_name}: {e} in {name} file"
            )

    def get_metadata_lines(self, name: str) -> Iterable[str]:
        return pkg_resources.yield_lines(self.get_metadata(name))

    def metadata_isdir(self, name: str) -> bool:
        return False

    def metadata_listdir(self, name: str) -> List[str]:
        return []

    def run_script(self, script_name: str, namespace: str) -> None:
        pass


class Distribution(BaseDistribution):
    def __init__(self, dist: pkg_resources.Distribution) -> None:
        self._dist = dist

    @classmethod
    def from_directory(cls, directory: str) -> BaseDistribution:
        dist_dir = directory.rstrip(os.sep)

        # Build a PathMetadata object, from path to metadata. :wink:
        base_dir, dist_dir_name = os.path.split(dist_dir)
        metadata = pkg_resources.PathMetadata(base_dir, dist_dir)

        # Determine the correct Distribution object type.
        if dist_dir.endswith(".egg-info"):
            dist_cls = pkg_resources.Distribution
            dist_name = os.path.splitext(dist_dir_name)[0]
        else:
            assert dist_dir.endswith(".dist-info")
            dist_cls = pkg_resources.DistInfoDistribution
            dist_name = os.path.splitext(dist_dir_name)[0].split("-")[0]

        dist = dist_cls(base_dir, project_name=dist_name, metadata=metadata)
        return cls(dist)

    @classmethod
    def from_metadata_file_contents(
        cls,
        metadata_contents: bytes,
        filename: str,
        project_name: str,
    ) -> BaseDistribution:
        metadata_dict = {
            "METADATA": metadata_contents,
        }
        dist = pkg_resources.DistInfoDistribution(
            location=filename,
            metadata=InMemoryMetadata(metadata_dict, filename),
            project_name=project_name,
        )
        return cls(dist)

    @classmethod
    def from_wheel(cls, wheel: Wheel, name: str) -> BaseDistribution:
        try:
            with wheel.as_zipfile() as zf:
                info_dir, _ = parse_wheel(zf, name)
                metadata_dict = {
                    path.split("/", 1)[-1]: read_wheel_metadata_file(zf, path)
                    for path in zf.namelist()
                    if path.startswith(f"{info_dir}/")
                }
        except zipfile.BadZipFile as e:
            raise InvalidWheel(wheel.location, name) from e
        except UnsupportedWheel as e:
            raise UnsupportedWheel(f"{name} has an invalid wheel, {e}")
        dist = pkg_resources.DistInfoDistribution(
            location=wheel.location,
            metadata=InMemoryMetadata(metadata_dict, wheel.location),
            project_name=name,
        )
        return cls(dist)

    @property
    def location(self) -> Optional[str]:
        return self._dist.location

    @property
    def installed_location(self) -> Optional[str]:
        egg_link = egg_link_path_from_location(self.raw_name)
        if egg_link:
            location = egg_link
        elif self.location:
            location = self.location
        else:
            return None
        return normalize_path(location)

    @property
    def info_location(self) -> Optional[str]:
        return self._dist.egg_info

    @property
    def installed_by_distutils(self) -> bool:
        # A distutils-installed distribution is provided by FileMetadata. This
        # provider has a "path" attribute not present anywhere else. Not the
        # best introspection logic, but pip has been doing this for a long time.
        try:
            return bool(self._dist._provider.path)
        except AttributeError:
            return False

    @property
    def canonical_name(self) -> NormalizedName:
        return canonicalize_name(self._dist.project_name)

    @property
    def version(self) -> DistributionVersion:
        return parse_version(self._dist.version)

    def is_file(self, path: InfoPath) -> bool:
        return self._dist.has_metadata(str(path))

    def iter_distutils_script_names(self) -> Iterator[str]:
        yield from self._dist.metadata_listdir("scripts")

    def read_text(self, path: InfoPath) -> str:
        name = str(path)
        if not self._dist.has_metadata(name):
            raise FileNotFoundError(name)
        content = self._dist.get_metadata(name)
        if content is None:
            raise NoneMetadataError(self, name)
        return content

    def iter_entry_points(self) -> Iterable[BaseEntryPoint]:
        for group, entries in self._dist.get_entry_map().items():
            for name, entry_point in entries.items():
                name, _, value = str(entry_point).partition("=")
                yield EntryPoint(name=name.strip(), value=value.strip(), group=group)

    def _metadata_impl(self) -> email.message.Message:
        """
        :raises NoneMetadataError: if the distribution reports `has_metadata()`
            True but `get_metadata()` returns None.
        """
        if isinstance(self._dist, pkg_resources.DistInfoDistribution):
            metadata_name = "METADATA"
        else:
            metadata_name = "PKG-INFO"
        try:
            metadata = self.read_text(metadata_name)
        except FileNotFoundError:
            if self.location:
                displaying_path = display_path(self.location)
            else:
                displaying_path = repr(self.location)
            logger.warning("No metadata found in %s", displaying_path)
            metadata = ""
        feed_parser = email.parser.FeedParser()
        feed_parser.feed(metadata)
        return feed_parser.close()

    def iter_dependencies(self, extras: Collection[str] = ()) -> Iterable[Requirement]:
        if extras:  # pkg_resources raises on invalid extras, so we sanitize.
            extras = frozenset(extras).intersection(self._dist.extras)
        return self._dist.requires(extras)

    def iter_provided_extras(self) -> Iterable[str]:
        return self._dist.extras


class Environment(BaseEnvironment):
    def __init__(self, ws: pkg_resources.WorkingSet) -> None:
        self._ws = ws

    @classmethod
    def default(cls) -> BaseEnvironment:
        return cls(pkg_resources.working_set)

    @classmethod
    def from_paths(cls, paths: Optional[List[str]]) -> BaseEnvironment:
        return cls(pkg_resources.WorkingSet(paths))

    def _iter_distributions(self) -> Iterator[BaseDistribution]:
        for dist in self._ws:
            yield Distribution(dist)

    def _search_distribution(self, name: str) -> Optional[BaseDistribution]:
        """Find a distribution matching the ``name`` in the environment.

        This searches from *all* distributions available in the environment, to
        match the behavior of ``pkg_resources.get_distribution()``.
        """
        canonical_name = canonicalize_name(name)
        for dist in self.iter_all_distributions():
            if dist.canonical_name == canonical_name:
                return dist
        return None

    def get_distribution(self, name: str) -> Optional[BaseDistribution]:
        # Search the distribution by looking through the working set.
        dist = self._search_distribution(name)
        if dist:
            return dist

        # If distribution could not be found, call working_set.require to
        # update the working set, and try to find the distribution again.
        # This might happen for e.g. when you install a package twice, once
        # using setup.py develop and again using setup.py install. Now when
        # running pip uninstall twice, the package gets removed from the
        # working set in the first uninstall, so we have to populate the
        # working set again so that pip knows about it and the packages gets
        # picked up and is successfully uninstalled the second time too.
        try:
            # We didn't pass in any version specifiers, so this can never
            # raise pkg_resources.VersionConflict.
            self._ws.require(name)
        except pkg_resources.DistributionNotFound:
            return None
        return self._search_distribution(name)