|
""" |
|
The main purpose of this module is to expose LinkCollector.collect_sources(). |
|
""" |
|
|
|
import collections |
|
import email.message |
|
import functools |
|
import itertools |
|
import json |
|
import logging |
|
import os |
|
import urllib.parse |
|
import urllib.request |
|
from html.parser import HTMLParser |
|
from optparse import Values |
|
from typing import ( |
|
TYPE_CHECKING, |
|
Callable, |
|
Dict, |
|
Iterable, |
|
List, |
|
MutableMapping, |
|
NamedTuple, |
|
Optional, |
|
Sequence, |
|
Tuple, |
|
Union, |
|
) |
|
|
|
from pip._vendor import requests |
|
from pip._vendor.requests import Response |
|
from pip._vendor.requests.exceptions import RetryError, SSLError |
|
|
|
from pip._internal.exceptions import NetworkConnectionError |
|
from pip._internal.models.link import Link |
|
from pip._internal.models.search_scope import SearchScope |
|
from pip._internal.network.session import PipSession |
|
from pip._internal.network.utils import raise_for_status |
|
from pip._internal.utils.filetypes import is_archive_file |
|
from pip._internal.utils.misc import redact_auth_from_url |
|
from pip._internal.vcs import vcs |
|
|
|
from .sources import CandidatesFromPage, LinkSource, build_source |
|
|
|
if TYPE_CHECKING: |
|
from typing import Protocol |
|
else: |
|
Protocol = object |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
ResponseHeaders = MutableMapping[str, str] |
|
|
|
|
|
def _match_vcs_scheme(url: str) -> Optional[str]: |
|
"""Look for VCS schemes in the URL. |
|
|
|
Returns the matched VCS scheme, or None if there's no match. |
|
""" |
|
for scheme in vcs.schemes: |
|
if url.lower().startswith(scheme) and url[len(scheme)] in "+:": |
|
return scheme |
|
return None |
|
|
|
|
|
class _NotAPIContent(Exception): |
|
def __init__(self, content_type: str, request_desc: str) -> None: |
|
super().__init__(content_type, request_desc) |
|
self.content_type = content_type |
|
self.request_desc = request_desc |
|
|
|
|
|
def _ensure_api_header(response: Response) -> None: |
|
""" |
|
Check the Content-Type header to ensure the response contains a Simple |
|
API Response. |
|
|
|
Raises `_NotAPIContent` if the content type is not a valid content-type. |
|
""" |
|
content_type = response.headers.get("Content-Type", "Unknown") |
|
|
|
content_type_l = content_type.lower() |
|
if content_type_l.startswith( |
|
( |
|
"text/html", |
|
"application/vnd.pypi.simple.v1+html", |
|
"application/vnd.pypi.simple.v1+json", |
|
) |
|
): |
|
return |
|
|
|
raise _NotAPIContent(content_type, response.request.method) |
|
|
|
|
|
class _NotHTTP(Exception): |
|
pass |
|
|
|
|
|
def _ensure_api_response(url: str, session: PipSession) -> None: |
|
""" |
|
Send a HEAD request to the URL, and ensure the response contains a simple |
|
API Response. |
|
|
|
Raises `_NotHTTP` if the URL is not available for a HEAD request, or |
|
`_NotAPIContent` if the content type is not a valid content type. |
|
""" |
|
scheme, netloc, path, query, fragment = urllib.parse.urlsplit(url) |
|
if scheme not in {"http", "https"}: |
|
raise _NotHTTP() |
|
|
|
resp = session.head(url, allow_redirects=True) |
|
raise_for_status(resp) |
|
|
|
_ensure_api_header(resp) |
|
|
|
|
|
def _get_simple_response(url: str, session: PipSession) -> Response: |
|
"""Access an Simple API response with GET, and return the response. |
|
|
|
This consists of three parts: |
|
|
|
1. If the URL looks suspiciously like an archive, send a HEAD first to |
|
check the Content-Type is HTML or Simple API, to avoid downloading a |
|
large file. Raise `_NotHTTP` if the content type cannot be determined, or |
|
`_NotAPIContent` if it is not HTML or a Simple API. |
|
2. Actually perform the request. Raise HTTP exceptions on network failures. |
|
3. Check the Content-Type header to make sure we got a Simple API response, |
|
and raise `_NotAPIContent` otherwise. |
|
""" |
|
if is_archive_file(Link(url).filename): |
|
_ensure_api_response(url, session=session) |
|
|
|
logger.debug("Getting page %s", redact_auth_from_url(url)) |
|
|
|
resp = session.get( |
|
url, |
|
headers={ |
|
"Accept": ", ".join( |
|
[ |
|
"application/vnd.pypi.simple.v1+json", |
|
"application/vnd.pypi.simple.v1+html; q=0.1", |
|
"text/html; q=0.01", |
|
] |
|
), |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"Cache-Control": "max-age=0", |
|
}, |
|
) |
|
raise_for_status(resp) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_ensure_api_header(resp) |
|
|
|
logger.debug( |
|
"Fetched page %s as %s", |
|
redact_auth_from_url(url), |
|
resp.headers.get("Content-Type", "Unknown"), |
|
) |
|
|
|
return resp |
|
|
|
|
|
def _get_encoding_from_headers(headers: ResponseHeaders) -> Optional[str]: |
|
"""Determine if we have any encoding information in our headers.""" |
|
if headers and "Content-Type" in headers: |
|
m = email.message.Message() |
|
m["content-type"] = headers["Content-Type"] |
|
charset = m.get_param("charset") |
|
if charset: |
|
return str(charset) |
|
return None |
|
|
|
|
|
class CacheablePageContent: |
|
def __init__(self, page: "IndexContent") -> None: |
|
assert page.cache_link_parsing |
|
self.page = page |
|
|
|
def __eq__(self, other: object) -> bool: |
|
return isinstance(other, type(self)) and self.page.url == other.page.url |
|
|
|
def __hash__(self) -> int: |
|
return hash(self.page.url) |
|
|
|
|
|
class ParseLinks(Protocol): |
|
def __call__(self, page: "IndexContent") -> Iterable[Link]: |
|
... |
|
|
|
|
|
def with_cached_index_content(fn: ParseLinks) -> ParseLinks: |
|
""" |
|
Given a function that parses an Iterable[Link] from an IndexContent, cache the |
|
function's result (keyed by CacheablePageContent), unless the IndexContent |
|
`page` has `page.cache_link_parsing == False`. |
|
""" |
|
|
|
@functools.lru_cache(maxsize=None) |
|
def wrapper(cacheable_page: CacheablePageContent) -> List[Link]: |
|
return list(fn(cacheable_page.page)) |
|
|
|
@functools.wraps(fn) |
|
def wrapper_wrapper(page: "IndexContent") -> List[Link]: |
|
if page.cache_link_parsing: |
|
return wrapper(CacheablePageContent(page)) |
|
return list(fn(page)) |
|
|
|
return wrapper_wrapper |
|
|
|
|
|
@with_cached_index_content |
|
def parse_links(page: "IndexContent") -> Iterable[Link]: |
|
""" |
|
Parse a Simple API's Index Content, and yield its anchor elements as Link objects. |
|
""" |
|
|
|
content_type_l = page.content_type.lower() |
|
if content_type_l.startswith("application/vnd.pypi.simple.v1+json"): |
|
data = json.loads(page.content) |
|
for file in data.get("files", []): |
|
link = Link.from_json(file, page.url) |
|
if link is None: |
|
continue |
|
yield link |
|
return |
|
|
|
parser = HTMLLinkParser(page.url) |
|
encoding = page.encoding or "utf-8" |
|
parser.feed(page.content.decode(encoding)) |
|
|
|
url = page.url |
|
base_url = parser.base_url or url |
|
for anchor in parser.anchors: |
|
link = Link.from_element(anchor, page_url=url, base_url=base_url) |
|
if link is None: |
|
continue |
|
yield link |
|
|
|
|
|
class IndexContent: |
|
"""Represents one response (or page), along with its URL""" |
|
|
|
def __init__( |
|
self, |
|
content: bytes, |
|
content_type: str, |
|
encoding: Optional[str], |
|
url: str, |
|
cache_link_parsing: bool = True, |
|
) -> None: |
|
""" |
|
:param encoding: the encoding to decode the given content. |
|
:param url: the URL from which the HTML was downloaded. |
|
:param cache_link_parsing: whether links parsed from this page's url |
|
should be cached. PyPI index urls should |
|
have this set to False, for example. |
|
""" |
|
self.content = content |
|
self.content_type = content_type |
|
self.encoding = encoding |
|
self.url = url |
|
self.cache_link_parsing = cache_link_parsing |
|
|
|
def __str__(self) -> str: |
|
return redact_auth_from_url(self.url) |
|
|
|
|
|
class HTMLLinkParser(HTMLParser): |
|
""" |
|
HTMLParser that keeps the first base HREF and a list of all anchor |
|
elements' attributes. |
|
""" |
|
|
|
def __init__(self, url: str) -> None: |
|
super().__init__(convert_charrefs=True) |
|
|
|
self.url: str = url |
|
self.base_url: Optional[str] = None |
|
self.anchors: List[Dict[str, Optional[str]]] = [] |
|
|
|
def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]) -> None: |
|
if tag == "base" and self.base_url is None: |
|
href = self.get_href(attrs) |
|
if href is not None: |
|
self.base_url = href |
|
elif tag == "a": |
|
self.anchors.append(dict(attrs)) |
|
|
|
def get_href(self, attrs: List[Tuple[str, Optional[str]]]) -> Optional[str]: |
|
for name, value in attrs: |
|
if name == "href": |
|
return value |
|
return None |
|
|
|
|
|
def _handle_get_simple_fail( |
|
link: Link, |
|
reason: Union[str, Exception], |
|
meth: Optional[Callable[..., None]] = None, |
|
) -> None: |
|
if meth is None: |
|
meth = logger.debug |
|
meth("Could not fetch URL %s: %s - skipping", link, reason) |
|
|
|
|
|
def _make_index_content( |
|
response: Response, cache_link_parsing: bool = True |
|
) -> IndexContent: |
|
encoding = _get_encoding_from_headers(response.headers) |
|
return IndexContent( |
|
response.content, |
|
response.headers["Content-Type"], |
|
encoding=encoding, |
|
url=response.url, |
|
cache_link_parsing=cache_link_parsing, |
|
) |
|
|
|
|
|
def _get_index_content(link: Link, *, session: PipSession) -> Optional["IndexContent"]: |
|
url = link.url.split("#", 1)[0] |
|
|
|
|
|
vcs_scheme = _match_vcs_scheme(url) |
|
if vcs_scheme: |
|
logger.warning( |
|
"Cannot look at %s URL %s because it does not support lookup as web pages.", |
|
vcs_scheme, |
|
link, |
|
) |
|
return None |
|
|
|
|
|
scheme, _, path, _, _, _ = urllib.parse.urlparse(url) |
|
if scheme == "file" and os.path.isdir(urllib.request.url2pathname(path)): |
|
|
|
|
|
if not url.endswith("/"): |
|
url += "/" |
|
|
|
|
|
|
|
|
|
url = urllib.parse.urljoin(url, "index.html") |
|
logger.debug(" file: URL is directory, getting %s", url) |
|
|
|
try: |
|
resp = _get_simple_response(url, session=session) |
|
except _NotHTTP: |
|
logger.warning( |
|
"Skipping page %s because it looks like an archive, and cannot " |
|
"be checked by a HTTP HEAD request.", |
|
link, |
|
) |
|
except _NotAPIContent as exc: |
|
logger.warning( |
|
"Skipping page %s because the %s request got Content-Type: %s. " |
|
"The only supported Content-Types are application/vnd.pypi.simple.v1+json, " |
|
"application/vnd.pypi.simple.v1+html, and text/html", |
|
link, |
|
exc.request_desc, |
|
exc.content_type, |
|
) |
|
except NetworkConnectionError as exc: |
|
_handle_get_simple_fail(link, exc) |
|
except RetryError as exc: |
|
_handle_get_simple_fail(link, exc) |
|
except SSLError as exc: |
|
reason = "There was a problem confirming the ssl certificate: " |
|
reason += str(exc) |
|
_handle_get_simple_fail(link, reason, meth=logger.info) |
|
except requests.ConnectionError as exc: |
|
_handle_get_simple_fail(link, f"connection error: {exc}") |
|
except requests.Timeout: |
|
_handle_get_simple_fail(link, "timed out") |
|
else: |
|
return _make_index_content(resp, cache_link_parsing=link.cache_link_parsing) |
|
return None |
|
|
|
|
|
class CollectedSources(NamedTuple): |
|
find_links: Sequence[Optional[LinkSource]] |
|
index_urls: Sequence[Optional[LinkSource]] |
|
|
|
|
|
class LinkCollector: |
|
|
|
""" |
|
Responsible for collecting Link objects from all configured locations, |
|
making network requests as needed. |
|
|
|
The class's main method is its collect_sources() method. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
session: PipSession, |
|
search_scope: SearchScope, |
|
) -> None: |
|
self.search_scope = search_scope |
|
self.session = session |
|
|
|
@classmethod |
|
def create( |
|
cls, |
|
session: PipSession, |
|
options: Values, |
|
suppress_no_index: bool = False, |
|
) -> "LinkCollector": |
|
""" |
|
:param session: The Session to use to make requests. |
|
:param suppress_no_index: Whether to ignore the --no-index option |
|
when constructing the SearchScope object. |
|
""" |
|
index_urls = [options.index_url] + options.extra_index_urls |
|
if options.no_index and not suppress_no_index: |
|
logger.debug( |
|
"Ignoring indexes: %s", |
|
",".join(redact_auth_from_url(url) for url in index_urls), |
|
) |
|
index_urls = [] |
|
|
|
|
|
find_links = options.find_links or [] |
|
|
|
search_scope = SearchScope.create( |
|
find_links=find_links, |
|
index_urls=index_urls, |
|
no_index=options.no_index, |
|
) |
|
link_collector = LinkCollector( |
|
session=session, |
|
search_scope=search_scope, |
|
) |
|
return link_collector |
|
|
|
@property |
|
def find_links(self) -> List[str]: |
|
return self.search_scope.find_links |
|
|
|
def fetch_response(self, location: Link) -> Optional[IndexContent]: |
|
""" |
|
Fetch an HTML page containing package links. |
|
""" |
|
return _get_index_content(location, session=self.session) |
|
|
|
def collect_sources( |
|
self, |
|
project_name: str, |
|
candidates_from_page: CandidatesFromPage, |
|
) -> CollectedSources: |
|
|
|
index_url_sources = collections.OrderedDict( |
|
build_source( |
|
loc, |
|
candidates_from_page=candidates_from_page, |
|
page_validator=self.session.is_secure_origin, |
|
expand_dir=False, |
|
cache_link_parsing=False, |
|
) |
|
for loc in self.search_scope.get_index_urls_locations(project_name) |
|
).values() |
|
find_links_sources = collections.OrderedDict( |
|
build_source( |
|
loc, |
|
candidates_from_page=candidates_from_page, |
|
page_validator=self.session.is_secure_origin, |
|
expand_dir=True, |
|
cache_link_parsing=True, |
|
) |
|
for loc in self.find_links |
|
).values() |
|
|
|
if logger.isEnabledFor(logging.DEBUG): |
|
lines = [ |
|
f"* {s.link}" |
|
for s in itertools.chain(find_links_sources, index_url_sources) |
|
if s is not None and s.link is not None |
|
] |
|
lines = [ |
|
f"{len(lines)} location(s) to search " |
|
f"for versions of {project_name}:" |
|
] + lines |
|
logger.debug("\n".join(lines)) |
|
|
|
return CollectedSources( |
|
find_links=list(find_links_sources), |
|
index_urls=list(index_url_sources), |
|
) |
|
|