|
import logging |
|
import shutil |
|
import sys |
|
import textwrap |
|
import xmlrpc.client |
|
from collections import OrderedDict |
|
from optparse import Values |
|
from typing import TYPE_CHECKING, Dict, List, Optional |
|
|
|
from pip._vendor.packaging.version import parse as parse_version |
|
|
|
from pip._internal.cli.base_command import Command |
|
from pip._internal.cli.req_command import SessionCommandMixin |
|
from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS |
|
from pip._internal.exceptions import CommandError |
|
from pip._internal.metadata import get_default_environment |
|
from pip._internal.models.index import PyPI |
|
from pip._internal.network.xmlrpc import PipXmlrpcTransport |
|
from pip._internal.utils.logging import indent_log |
|
from pip._internal.utils.misc import write_output |
|
|
|
if TYPE_CHECKING: |
|
from typing import TypedDict |
|
|
|
class TransformedHit(TypedDict): |
|
name: str |
|
summary: str |
|
versions: List[str] |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class SearchCommand(Command, SessionCommandMixin): |
|
"""Search for PyPI packages whose name or summary contains <query>.""" |
|
|
|
usage = """ |
|
%prog [options] <query>""" |
|
ignore_require_venv = True |
|
|
|
def add_options(self) -> None: |
|
self.cmd_opts.add_option( |
|
"-i", |
|
"--index", |
|
dest="index", |
|
metavar="URL", |
|
default=PyPI.pypi_url, |
|
help="Base URL of Python Package Index (default %default)", |
|
) |
|
|
|
self.parser.insert_option_group(0, self.cmd_opts) |
|
|
|
def run(self, options: Values, args: List[str]) -> int: |
|
if not args: |
|
raise CommandError("Missing required argument (search query).") |
|
query = args |
|
pypi_hits = self.search(query, options) |
|
hits = transform_hits(pypi_hits) |
|
|
|
terminal_width = None |
|
if sys.stdout.isatty(): |
|
terminal_width = shutil.get_terminal_size()[0] |
|
|
|
print_results(hits, terminal_width=terminal_width) |
|
if pypi_hits: |
|
return SUCCESS |
|
return NO_MATCHES_FOUND |
|
|
|
def search(self, query: List[str], options: Values) -> List[Dict[str, str]]: |
|
index_url = options.index |
|
|
|
session = self.get_default_session(options) |
|
|
|
transport = PipXmlrpcTransport(index_url, session) |
|
pypi = xmlrpc.client.ServerProxy(index_url, transport) |
|
try: |
|
hits = pypi.search({"name": query, "summary": query}, "or") |
|
except xmlrpc.client.Fault as fault: |
|
message = "XMLRPC request failed [code: {code}]\n{string}".format( |
|
code=fault.faultCode, |
|
string=fault.faultString, |
|
) |
|
raise CommandError(message) |
|
assert isinstance(hits, list) |
|
return hits |
|
|
|
|
|
def transform_hits(hits: List[Dict[str, str]]) -> List["TransformedHit"]: |
|
""" |
|
The list from pypi is really a list of versions. We want a list of |
|
packages with the list of versions stored inline. This converts the |
|
list from pypi into one we can use. |
|
""" |
|
packages: Dict[str, "TransformedHit"] = OrderedDict() |
|
for hit in hits: |
|
name = hit["name"] |
|
summary = hit["summary"] |
|
version = hit["version"] |
|
|
|
if name not in packages.keys(): |
|
packages[name] = { |
|
"name": name, |
|
"summary": summary, |
|
"versions": [version], |
|
} |
|
else: |
|
packages[name]["versions"].append(version) |
|
|
|
|
|
if version == highest_version(packages[name]["versions"]): |
|
packages[name]["summary"] = summary |
|
|
|
return list(packages.values()) |
|
|
|
|
|
def print_dist_installation_info(name: str, latest: str) -> None: |
|
env = get_default_environment() |
|
dist = env.get_distribution(name) |
|
if dist is not None: |
|
with indent_log(): |
|
if dist.version == latest: |
|
write_output("INSTALLED: %s (latest)", dist.version) |
|
else: |
|
write_output("INSTALLED: %s", dist.version) |
|
if parse_version(latest).pre: |
|
write_output( |
|
"LATEST: %s (pre-release; install" |
|
" with `pip install --pre`)", |
|
latest, |
|
) |
|
else: |
|
write_output("LATEST: %s", latest) |
|
|
|
|
|
def print_results( |
|
hits: List["TransformedHit"], |
|
name_column_width: Optional[int] = None, |
|
terminal_width: Optional[int] = None, |
|
) -> None: |
|
if not hits: |
|
return |
|
if name_column_width is None: |
|
name_column_width = ( |
|
max( |
|
[ |
|
len(hit["name"]) + len(highest_version(hit.get("versions", ["-"]))) |
|
for hit in hits |
|
] |
|
) |
|
+ 4 |
|
) |
|
|
|
for hit in hits: |
|
name = hit["name"] |
|
summary = hit["summary"] or "" |
|
latest = highest_version(hit.get("versions", ["-"])) |
|
if terminal_width is not None: |
|
target_width = terminal_width - name_column_width - 5 |
|
if target_width > 10: |
|
|
|
summary_lines = textwrap.wrap(summary, target_width) |
|
summary = ("\n" + " " * (name_column_width + 3)).join(summary_lines) |
|
|
|
name_latest = f"{name} ({latest})" |
|
line = f"{name_latest:{name_column_width}} - {summary}" |
|
try: |
|
write_output(line) |
|
print_dist_installation_info(name, latest) |
|
except UnicodeEncodeError: |
|
pass |
|
|
|
|
|
def highest_version(versions: List[str]) -> str: |
|
return max(versions, key=parse_version) |
|
|