|
import contextlib |
|
import errno |
|
import logging |
|
import logging.handlers |
|
import os |
|
import sys |
|
import threading |
|
from dataclasses import dataclass |
|
from io import TextIOWrapper |
|
from logging import Filter |
|
from typing import Any, ClassVar, Generator, List, Optional, TextIO, Type |
|
|
|
from pip._vendor.rich.console import ( |
|
Console, |
|
ConsoleOptions, |
|
ConsoleRenderable, |
|
RenderableType, |
|
RenderResult, |
|
RichCast, |
|
) |
|
from pip._vendor.rich.highlighter import NullHighlighter |
|
from pip._vendor.rich.logging import RichHandler |
|
from pip._vendor.rich.segment import Segment |
|
from pip._vendor.rich.style import Style |
|
|
|
from pip._internal.utils._log import VERBOSE, getLogger |
|
from pip._internal.utils.compat import WINDOWS |
|
from pip._internal.utils.deprecation import DEPRECATION_MSG_PREFIX |
|
from pip._internal.utils.misc import ensure_dir |
|
|
|
_log_state = threading.local() |
|
subprocess_logger = getLogger("pip.subprocessor") |
|
|
|
|
|
class BrokenStdoutLoggingError(Exception): |
|
""" |
|
Raised if BrokenPipeError occurs for the stdout stream while logging. |
|
""" |
|
|
|
|
|
def _is_broken_pipe_error(exc_class: Type[BaseException], exc: BaseException) -> bool: |
|
if exc_class is BrokenPipeError: |
|
return True |
|
|
|
|
|
|
|
|
|
if not WINDOWS: |
|
return False |
|
|
|
return isinstance(exc, OSError) and exc.errno in (errno.EINVAL, errno.EPIPE) |
|
|
|
|
|
@contextlib.contextmanager |
|
def indent_log(num: int = 2) -> Generator[None, None, None]: |
|
""" |
|
A context manager which will cause the log output to be indented for any |
|
log messages emitted inside it. |
|
""" |
|
|
|
_log_state.indentation = get_indentation() |
|
_log_state.indentation += num |
|
try: |
|
yield |
|
finally: |
|
_log_state.indentation -= num |
|
|
|
|
|
def get_indentation() -> int: |
|
return getattr(_log_state, "indentation", 0) |
|
|
|
|
|
class IndentingFormatter(logging.Formatter): |
|
default_time_format = "%Y-%m-%dT%H:%M:%S" |
|
|
|
def __init__( |
|
self, |
|
*args: Any, |
|
add_timestamp: bool = False, |
|
**kwargs: Any, |
|
) -> None: |
|
""" |
|
A logging.Formatter that obeys the indent_log() context manager. |
|
|
|
:param add_timestamp: A bool indicating output lines should be prefixed |
|
with their record's timestamp. |
|
""" |
|
self.add_timestamp = add_timestamp |
|
super().__init__(*args, **kwargs) |
|
|
|
def get_message_start(self, formatted: str, levelno: int) -> str: |
|
""" |
|
Return the start of the formatted log message (not counting the |
|
prefix to add to each line). |
|
""" |
|
if levelno < logging.WARNING: |
|
return "" |
|
if formatted.startswith(DEPRECATION_MSG_PREFIX): |
|
|
|
|
|
return "" |
|
if levelno < logging.ERROR: |
|
return "WARNING: " |
|
|
|
return "ERROR: " |
|
|
|
def format(self, record: logging.LogRecord) -> str: |
|
""" |
|
Calls the standard formatter, but will indent all of the log message |
|
lines by our current indentation level. |
|
""" |
|
formatted = super().format(record) |
|
message_start = self.get_message_start(formatted, record.levelno) |
|
formatted = message_start + formatted |
|
|
|
prefix = "" |
|
if self.add_timestamp: |
|
prefix = f"{self.formatTime(record)} " |
|
prefix += " " * get_indentation() |
|
formatted = "".join([prefix + line for line in formatted.splitlines(True)]) |
|
return formatted |
|
|
|
|
|
@dataclass |
|
class IndentedRenderable: |
|
renderable: RenderableType |
|
indent: int |
|
|
|
def __rich_console__( |
|
self, console: Console, options: ConsoleOptions |
|
) -> RenderResult: |
|
segments = console.render(self.renderable, options) |
|
lines = Segment.split_lines(segments) |
|
for line in lines: |
|
yield Segment(" " * self.indent) |
|
yield from line |
|
yield Segment("\n") |
|
|
|
|
|
class RichPipStreamHandler(RichHandler): |
|
KEYWORDS: ClassVar[Optional[List[str]]] = [] |
|
|
|
def __init__(self, stream: Optional[TextIO], no_color: bool) -> None: |
|
super().__init__( |
|
console=Console(file=stream, no_color=no_color, soft_wrap=True), |
|
show_time=False, |
|
show_level=False, |
|
show_path=False, |
|
highlighter=NullHighlighter(), |
|
) |
|
|
|
|
|
def emit(self, record: logging.LogRecord) -> None: |
|
style: Optional[Style] = None |
|
|
|
|
|
assert isinstance(record.args, tuple) |
|
if record.msg == "[present-rich] %s" and len(record.args) == 1: |
|
rich_renderable = record.args[0] |
|
assert isinstance( |
|
rich_renderable, (ConsoleRenderable, RichCast, str) |
|
), f"{rich_renderable} is not rich-console-renderable" |
|
|
|
renderable: RenderableType = IndentedRenderable( |
|
rich_renderable, indent=get_indentation() |
|
) |
|
else: |
|
message = self.format(record) |
|
renderable = self.render_message(record, message) |
|
if record.levelno is not None: |
|
if record.levelno >= logging.ERROR: |
|
style = Style(color="red") |
|
elif record.levelno >= logging.WARNING: |
|
style = Style(color="yellow") |
|
|
|
try: |
|
self.console.print(renderable, overflow="ignore", crop=False, style=style) |
|
except Exception: |
|
self.handleError(record) |
|
|
|
def handleError(self, record: logging.LogRecord) -> None: |
|
"""Called when logging is unable to log some output.""" |
|
|
|
exc_class, exc = sys.exc_info()[:2] |
|
|
|
|
|
|
|
|
|
if ( |
|
exc_class |
|
and exc |
|
and self.console.file is sys.stdout |
|
and _is_broken_pipe_error(exc_class, exc) |
|
): |
|
raise BrokenStdoutLoggingError() |
|
|
|
return super().handleError(record) |
|
|
|
|
|
class BetterRotatingFileHandler(logging.handlers.RotatingFileHandler): |
|
def _open(self) -> TextIOWrapper: |
|
ensure_dir(os.path.dirname(self.baseFilename)) |
|
return super()._open() |
|
|
|
|
|
class MaxLevelFilter(Filter): |
|
def __init__(self, level: int) -> None: |
|
self.level = level |
|
|
|
def filter(self, record: logging.LogRecord) -> bool: |
|
return record.levelno < self.level |
|
|
|
|
|
class ExcludeLoggerFilter(Filter): |
|
|
|
""" |
|
A logging Filter that excludes records from a logger (or its children). |
|
""" |
|
|
|
def filter(self, record: logging.LogRecord) -> bool: |
|
|
|
|
|
return not super().filter(record) |
|
|
|
|
|
def setup_logging(verbosity: int, no_color: bool, user_log_file: Optional[str]) -> int: |
|
"""Configures and sets up all of the logging |
|
|
|
Returns the requested logging level, as its integer value. |
|
""" |
|
|
|
|
|
if verbosity >= 2: |
|
level_number = logging.DEBUG |
|
elif verbosity == 1: |
|
level_number = VERBOSE |
|
elif verbosity == -1: |
|
level_number = logging.WARNING |
|
elif verbosity == -2: |
|
level_number = logging.ERROR |
|
elif verbosity <= -3: |
|
level_number = logging.CRITICAL |
|
else: |
|
level_number = logging.INFO |
|
|
|
level = logging.getLevelName(level_number) |
|
|
|
|
|
|
|
include_user_log = user_log_file is not None |
|
if include_user_log: |
|
additional_log_file = user_log_file |
|
root_level = "DEBUG" |
|
else: |
|
additional_log_file = "/dev/null" |
|
root_level = level |
|
|
|
|
|
|
|
vendored_log_level = "WARNING" if level in ["INFO", "ERROR"] else "DEBUG" |
|
|
|
|
|
log_streams = { |
|
"stdout": "ext://sys.stdout", |
|
"stderr": "ext://sys.stderr", |
|
} |
|
handler_classes = { |
|
"stream": "pip._internal.utils.logging.RichPipStreamHandler", |
|
"file": "pip._internal.utils.logging.BetterRotatingFileHandler", |
|
} |
|
handlers = ["console", "console_errors", "console_subprocess"] + ( |
|
["user_log"] if include_user_log else [] |
|
) |
|
|
|
logging.config.dictConfig( |
|
{ |
|
"version": 1, |
|
"disable_existing_loggers": False, |
|
"filters": { |
|
"exclude_warnings": { |
|
"()": "pip._internal.utils.logging.MaxLevelFilter", |
|
"level": logging.WARNING, |
|
}, |
|
"restrict_to_subprocess": { |
|
"()": "logging.Filter", |
|
"name": subprocess_logger.name, |
|
}, |
|
"exclude_subprocess": { |
|
"()": "pip._internal.utils.logging.ExcludeLoggerFilter", |
|
"name": subprocess_logger.name, |
|
}, |
|
}, |
|
"formatters": { |
|
"indent": { |
|
"()": IndentingFormatter, |
|
"format": "%(message)s", |
|
}, |
|
"indent_with_timestamp": { |
|
"()": IndentingFormatter, |
|
"format": "%(message)s", |
|
"add_timestamp": True, |
|
}, |
|
}, |
|
"handlers": { |
|
"console": { |
|
"level": level, |
|
"class": handler_classes["stream"], |
|
"no_color": no_color, |
|
"stream": log_streams["stdout"], |
|
"filters": ["exclude_subprocess", "exclude_warnings"], |
|
"formatter": "indent", |
|
}, |
|
"console_errors": { |
|
"level": "WARNING", |
|
"class": handler_classes["stream"], |
|
"no_color": no_color, |
|
"stream": log_streams["stderr"], |
|
"filters": ["exclude_subprocess"], |
|
"formatter": "indent", |
|
}, |
|
|
|
|
|
"console_subprocess": { |
|
"level": level, |
|
"class": handler_classes["stream"], |
|
"stream": log_streams["stderr"], |
|
"no_color": no_color, |
|
"filters": ["restrict_to_subprocess"], |
|
"formatter": "indent", |
|
}, |
|
"user_log": { |
|
"level": "DEBUG", |
|
"class": handler_classes["file"], |
|
"filename": additional_log_file, |
|
"encoding": "utf-8", |
|
"delay": True, |
|
"formatter": "indent_with_timestamp", |
|
}, |
|
}, |
|
"root": { |
|
"level": root_level, |
|
"handlers": handlers, |
|
}, |
|
"loggers": {"pip._vendor": {"level": vendored_log_level}}, |
|
} |
|
) |
|
|
|
return level_number |
|
|