Source code for mode.utils.logging

"""Logging utilities."""
import asyncio
import logging
import logging.config
import os
import sys
import threading
import traceback
from contextlib import contextmanager
from functools import singledispatch, wraps
from itertools import count
from logging import Logger
from pprint import pprint
from time import asctime
from types import TracebackType
from typing import (
    Any,
    Callable,
    ClassVar,
    ContextManager,
    Dict,
    IO,
    Iterable,
    List,
    Mapping,
    NamedTuple,
    Optional,
    Sequence,
    Set,
    Tuple,
    Type,
    Union,
)
from .futures import all_tasks, current_task
from .text import title
from .times import Seconds, want_seconds
from .tracebacks import format_task_stack, print_task_stack

import colorlog

__all__ = [
    'CompositeLogger',
    'ExtensionFormatter',
    'FileLogProxy',
    'FormatterHandler',
    'LogSeverityMixin',
    'Logwrapped',
    'Severity',
    'cry',
    'flight_recorder',
    'formatter',
    'get_logger',
    'level_name',
    'level_number',
    'redirect_logger',
    'redirect_stdouts',
    'setup_logging',
]

DEVLOG: bool = bool(os.environ.get('DEVLOG', ''))
DEFAULT_FORMAT: str = '''
[%(asctime)s: %(levelname)s]: %(message)s %(extra)s
'''.strip()

DEFAULT_COLOR_FORMAT = '''
[%(asctime)s: %(levelname)s]: %(log_color)s%(message)s %(extra)s
'''.strip()


DEFAULT_COLORS = {
    **colorlog.default_log_colors,
    'INFO': 'white',
    'DEBUG': 'blue',
}


DEFAULT_FORMATTERS = {
    'default': {
        '()': 'mode.utils.logging.DefaultFormatter',
        'format': DEFAULT_FORMAT,
    },
    'colored': {
        '()': 'mode.utils.logging.ExtensionFormatter',
        'format': DEFAULT_COLOR_FORMAT,
        'log_colors': DEFAULT_COLORS,
        'stream': sys.stdout,
    },
}


def _logger_config(handlers: List[str],
                   level: Union[str, int] = 'INFO') -> Dict:
    return {
        'handlers': handlers,
        'level': level,
    }


def create_logconfig(version: int = 1,
                     disable_existing_loggers: bool = False,
                     formatters: Dict = DEFAULT_FORMATTERS,
                     handlers: Dict = None,
                     root: Dict = None) -> Dict:
    return {
        'version': version,
        # do not disable existing loggers from other modules.
        # see https://www.caktusgroup.com/blog/2015/01/27/
        #    Django-Logging-Configuration-logging_config-default-settings-logger/
        'disable_existing_loggers': disable_existing_loggers,
        'formatters': formatters,
        'handlers': handlers,
        'root': root,
    }


#: Set by ``setup_logging`` if logging target file is a TTY.
LOG_ISATTY: bool = False

FormatterHandler = Callable[[Any], Any]
Severity = Union[int, str]

_formatter_registry: Set[FormatterHandler] = set()


[docs]def get_logger(name: str) -> Logger: """Get logger by name.""" logger = logging.getLogger(name) if not logger.handlers: logger.addHandler(logging.NullHandler()) return logger
redirect_logger = get_logger('mode.redirect')
[docs]class LogSeverityMixin: """Mixin class that delegates standard logging methods to logger. The class that mixes in this class must define the ``log`` method. Example: >>> class Foo(LogSeverityMixin): ... ... logger = get_logger('foo') ... ... def log(self, ... severity: int, ... msg: str, ... *args: Any, **kwargs: Any) -> None: ... return self.logger.log(severity, msg, *args, **kwargs) """
[docs] def dev(self, msg: str, *args: Any, **kwargs: Any) -> None: if DEVLOG: self.info(msg, *args, **kwargs)
[docs] def debug(self, msg: str, *args: Any, **kwargs: Any) -> None: self.log(logging.DEBUG, msg, *args, **kwargs)
[docs] def info(self, msg: str, *args: Any, **kwargs: Any) -> None: self.log(logging.INFO, msg, *args, **kwargs)
[docs] def warn(self, msg: str, *args: Any, **kwargs: Any) -> None: self.log(logging.WARN, msg, *args, **kwargs)
[docs] def warning(self, msg: str, *args: Any, **kwargs: Any) -> None: self.log(logging.WARN, msg, *args, **kwargs)
[docs] def error(self, msg: str, *args: Any, **kwargs: Any) -> None: self.log(logging.ERROR, msg, *args, **kwargs)
[docs] def crit(self, msg: str, *args: Any, **kwargs: Any) -> None: self.log(logging.CRITICAL, msg, *args, **kwargs)
[docs] def critical(self, msg: str, *args: Any, **kwargs: Any) -> None: self.log(logging.CRITICAL, msg, *args, **kwargs)
[docs] def exception(self, msg: str, *args: Any, **kwargs: Any) -> None: self.log(logging.ERROR, msg, *args, exc_info=1, **kwargs)
[docs]class CompositeLogger(LogSeverityMixin): """Composite logger for classes. The class can be used as both mixin and composite, and may also define a ``.formatter`` attribute which will reformat any log messages sent. Service uses this to add logging methods: .. sourcecode:: python class Service(ServiceT): log: CompositeLogger def __init__(self): self.log = CompositeLogger( logger=self.logger, formatter=self._format_log, ) def _format_log(self, severity: int, msg: str, *args: Any, **kwargs: Any) -> str: return (f'[^{"-" * (self.beacon.depth - 1)}' f'{self.shortlabel}]: {msg}') This means those defining a service may also use it to log: .. sourcecode:: pycon >>> service.log.info('Something happened') and when logging additional information about the service is automatically included. """ logger: Logger def __init__(self, logger: Logger, formatter: Callable[..., str] = None) -> None: self.logger = logger self.formatter: Callable[..., str] = formatter
[docs] def log(self, severity: int, msg: str, *args: Any, **kwargs: Any) -> None: self.logger.log(severity, self.format(severity, msg, *args, **kwargs), *args, **kwargs)
[docs] def format(self, severity: int, msg: str, *args: Any, **kwargs: Any) -> str: if self.formatter: return self.formatter(severity, msg, *args, **kwargs) return msg
[docs]def formatter(fun: FormatterHandler) -> FormatterHandler: """Register formatter for logging positional args.""" _formatter_registry.add(fun) return fun
def _format_extra(record: logging.LogRecord) -> str: return ', '.join( f'{k}={v!r}' for k, v in record.__dict__.get('data', {}).items() ) class DefaultFormatter(logging.Formatter): """Default formatter adds support for extra data.""" def format(self, record: logging.LogRecord) -> str: record.extra = _format_extra(record) return super().format(record)
[docs]class ExtensionFormatter(colorlog.TTYColoredFormatter): """Formatter that can register callbacks to format args. Extends :pypi:`colorlog`. """ def __init__(self, stream: IO = None, **kwargs: Any) -> None: super().__init__(stream=stream or sys.stdout, **kwargs)
[docs] def format(self, record: logging.LogRecord) -> str: self._format_args(record) record.extra = _format_extra(record) return super().format(record)
def _format_args(self, record: logging.LogRecord) -> None: if isinstance(record.args, Mapping): # logger.log(severity, "msg %(foo)s", foo=303) record.args = { k: self._format_arg(v) for k, v in record.args.items() } else: if not isinstance(record.args, tuple): # logger.log(severity, "msg %s", foo) record.args = (record.args,) # logger.log(severity, "msg %s", ('foo',)) record.args = tuple( self._format_arg(arg) for arg in record.args ) def _format_arg(self, arg: Any) -> Any: # Reduce value by calling all registered formatters. for fun in _formatter_registry: res = fun(arg) if res is not None: arg = res return arg
[docs]@singledispatch def level_name(loglevel: int) -> str: """Convert log level to number.""" return logging.getLevelName(loglevel)
@level_name.register(str) def _when_str(loglevel: str) -> str: return loglevel.upper()
[docs]@singledispatch def level_number(loglevel: int) -> int: """Convert log level number to name.""" return loglevel
@level_number.register(str) def _(loglevel: str) -> int: return logging.getLevelName(loglevel.upper()) # type: ignore
[docs]def setup_logging( *, loglevel: Union[str, int] = None, logfile: Union[str, IO] = None, loghandlers: List[logging.StreamHandler] = None, logging_config: Dict = None) -> int: """Configure logging subsystem.""" stream: IO = None _loglevel: int = level_number(loglevel) if not isinstance(logfile, str): stream, logfile = logfile, None if stream is None: stream = sys.stdout global LOG_ISATTY try: LOG_ISATTY = stream.isatty() except AttributeError: pass logging.root.handlers.clear() _setup_logging( level=_loglevel, filename=logfile, stream=stream, logging_config=logging_config, loghandlers=loghandlers, ) return _loglevel
def _setup_logging(*, level: Union[int, str] = None, filename: str = None, stream: IO = None, loghandlers: List[logging.StreamHandler] = None, logging_config: Dict = None) -> None: if filename: assert stream is None handlers = { 'default': { 'level': level, 'class': 'logging.FileHandler', 'formatter': 'default', 'filename': filename, }, } elif stream: handlers = { 'default': { 'level': level, 'class': 'colorlog.StreamHandler', 'formatter': 'colored', }, } config = create_logconfig(handlers=handlers, root={ 'level': level, 'handlers': ['default'], }) if logging_config is None: logging_config = config elif logging_config.pop('merge', False): logging_config = {**config, **logging_config} for k in ('formatters', 'filters', 'handlers', 'loggers', 'root'): logging_config[k] = {**config.get(k, {}), **logging_config.get(k, {})} logging.config.dictConfig(logging_config) if loghandlers is not None: logging.root.handlers.extend(loghandlers)
[docs]class Logwrapped(object): """Wrap all object methods, to log on call.""" obj: Any logger: Any severity: int ident: str _ignore: ClassVar[Set[str]] = {'__enter__', '__exit__'} def __init__(self, obj: Any, logger: Any = None, severity: Severity = None, ident: str = '') -> None: self.obj = obj self.logger = logger self.severity = level_number(severity) if severity else severity self.ident = ident def __getattr__(self, key: str) -> Any: meth = getattr(self.obj, key) if not callable(meth) or key in self.__ignore: return meth @wraps(meth) def __wrapped(*args: Any, **kwargs: Any) -> Any: info = '' if self.ident: info += self.ident.format(self.obj) info += f'{meth.__name__}(' if args: info += ', '.join(map(repr, args)) if kwargs: if args: info += ', ' info += ', '.join(f'{key}={value!r}' for key, value in kwargs.items()) info += ')' self.logger.log(self.severity, info) return meth(*args, **kwargs) return __wrapped def __repr__(self): return repr(self.obj) def __dir__(self): return dir(self.obj)
[docs]def cry(file: IO, *, sep1: str = '=', sep2: str = '-', sep3: str = '~', seplen: int = 49) -> None: """Return stack-trace of all active threads. See Also: Taken from https://gist.github.com/737056. """ # get a map of threads by their ID so we can print their names # during the traceback dump tmap = {t.ident: t for t in threading.enumerate()} current_thread = threading.current_thread() sep1 = sep1 * seplen if len(sep1) == 1 else sep1 sep2 = sep2 * seplen if len(sep2) == 1 else sep2 sep3 = sep3 * seplen if len(sep3) == 1 else sep3 for tid, frame in sys._current_frames().items(): thread = tmap.get(tid) if thread: if thread.ident == current_thread.ident: loop = asyncio.get_event_loop() else: loop = getattr(thread, 'loop', None) print(f'THREAD {thread.name}', file=file) # noqa: T003 print(sep1, file=file) # noqa: T003 traceback.print_stack(frame, file=file) print(sep2, file=file) # noqa: T003 print('LOCAL VARIABLES', file=file) # noqa: T003 print(sep2, file=file) # noqa: T003 pprint(frame.f_locals, stream=file) if loop is not None: print('TASKS', file=file) print(sep2, file=file) for task in all_tasks(loop=loop): print_task_name(task, file=file) print(f' {sep3}', file=file) print_task_stack(task, file=file, capture_locals=True) print('\n', file=file) # noqa: T003
def print_task_name(task: asyncio.Task, file: IO) -> None: """Print name of :class:`asyncio.Task` in tracebacks.""" coro = task._coro # type: ignore wrapped = getattr(task, '__wrapped__', None) coro_name = getattr(coro, '__name__', None) if coro_name is None: # some coroutines does not have a __name__ attribute # e.g. async_generator_asend coro_name = repr(coro) print(f' TASK {coro_name}', file=file) if wrapped: print(f' -> {wrapped}', file=file) print(f' {task!r}', file=file) class LogMessage(NamedTuple): """Archived log message.""" severity: int message: str asctime: str args: Tuple[Any, ...] kwargs: Dict[str, Any]
[docs]class flight_recorder(ContextManager, LogSeverityMixin): """Flight Recorder context for use with :keyword:`with` statement. This is a logging utility to log stuff only when something times out. For example if you have a background thread that is sometimes hanging:: class RedisCache(mode.Service): @mode.timer(1.0) def _background_refresh(self) -> None: self._users = await self.redis_client.get(USER_KEY) self._posts = await self.redis_client.get(POSTS_KEY) You want to figure out on what line this is hanging, but logging all the time will provide way too much output, and will even change how fast the program runs and that can mask race conditions, so that they never happen. Use the flight recorder to save the logs and only log when it times out: .. sourcecode:: python logger = mode.get_logger(__name__) class RedisCache(mode.Service): @mode.timer(1.0) def _background_refresh(self) -> None: with mode.flight_recorder(logger, timeout=10.0) as on_timeout: on_timeout.info(f'+redis_client.get({USER_KEY!r})') await self.redis_client.get(USER_KEY) on_timeout.info(f'-redis_client.get({USER_KEY!r})') on_timeout.info(f'+redis_client.get({POSTS_KEY!r})') await self.redis_client.get(POSTS_KEY) on_timeout.info(f'-redis_client.get({POSTS_KEY!r})') If the body of this :keyword:`with` statement completes before the timeout, the logs are forgotten about and never emitted -- if it takes more than ten seconds to complete, we will see these messages in the log: .. sourcecode:: text [2018-04-19 09:43:55,877: WARNING]: Warning: Task timed out! [2018-04-19 09:43:55,878: WARNING]: Please make sure it is hanging before restarting. [2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1] (started at Thu Apr 19 09:43:45 2018) Replaying logs... [2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1] (Thu Apr 19 09:43:45 2018) +redis_client.get('user') [2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1] (Thu Apr 19 09:43:49 2018) -redis_client.get('user') [2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1] (Thu Apr 19 09:43:46 2018) +redis_client.get('posts') [2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1] -End of log- Now we know this ``redis_client.get`` call can take too long to complete, and should consider adding a timeout to it. """ _id_source: ClassVar[Iterable[int]] = count(1) logger: Any timeout: float loop: asyncio.AbstractEventLoop started_at_date: str _fut: asyncio.Future _logs: List[Tuple[int, str, Tuple[Any], Dict[str, Any]]] def __init__(self, logger: Any, *, timeout: Seconds, loop: asyncio.AbstractEventLoop = None) -> None: self.id = next(self._id_source) self.logger = logger self.timeout = want_seconds(timeout) self.loop = loop or asyncio.get_event_loop() self.started_at_date = None self._fut = None self._logs = []
[docs] def wrap_debug(self, obj: Any) -> Logwrapped: return self.wrap(logging.DEBUG, obj)
[docs] def wrap_info(self, obj: Any) -> Logwrapped: return self.wrap(logging.INFO, obj)
[docs] def wrap_warn(self, obj: Any) -> Logwrapped: return self.wrap(logging.WARN, obj)
[docs] def wrap_error(self, obj: Any) -> Logwrapped: return self.wrap(logging.ERROR, obj)
[docs] def wrap(self, severity: int, obj: Any) -> Logwrapped: return Logwrapped(logger=self, severity=severity, obj=obj)
[docs] def activate(self) -> None: if self._fut: raise RuntimeError('{type(self).__name__} already activated') self.enabled_by = current_task() self.started_at_date = asctime() self._fut = asyncio.ensure_future(self._waiting(), loop=self.loop)
[docs] def cancel(self) -> None: fut, self._fut = self._fut, None self._logs.clear() if fut is not None: fut.cancel()
[docs] def log(self, severity: int, message: str, *args: Any, **kwargs: Any) -> None: if self._fut: self._buffer_log(severity, message, args, kwargs) else: self.logger.log(severity, message, *args, **kwargs)
def _buffer_log(self, severity: int, message: str, args: Any, kwargs: Any) -> None: log = LogMessage(severity, message, asctime(), args, kwargs) self._logs.append(log) async def _waiting(self) -> None: try: await asyncio.sleep(self.timeout) except asyncio.CancelledError: pass else: try: logger = self.logger ident = self._ident() logger.warning('Warning: Task timed out!') logger.warning( 'Please make sure it\'s hanging before restart.') logger.info('[%s] (started at %s) Replaying logs...', ident, self.started_at_date) if self._logs: for sev, msg, datestr, args, kwargs in self._logs: logger.log( sev, f'[%s] (%s) {msg}', ident, datestr, *args, **kwargs) logger.info('[%s] -End of log-', ident) logger.info('[%s] Task traceback', ident) if self.enabled_by is not None: logger.info(format_task_stack(self.enabled_by)) else: logger.info('[%s] -missing-: not enabled by task') except Exception as exc: logger.exception('Flight recorder internal error: %r', exc) raise def _ident(self) -> str: return f'{title(type(self).__name__)}-{self.id}' def __repr__(self) -> str: return f'<{self._ident()} @{id(self):#x}>' def __enter__(self) -> 'flight_recorder': self.activate() return self def __exit__(self, exc_type: Type[BaseException] = None, exc_val: BaseException = None, exc_tb: TracebackType = None) -> Optional[bool]: self.cancel()
[docs]class FileLogProxy: """File-like object that forwards data to logger.""" mode: str = 'w' name: str = None closed: bool = False severity: int = logging.WARN _threadlocal: threading.local = threading.local() def __init__(self, logger: Logger, *, severity: Severity = None) -> None: self.logger = logger if severity: self.severity = level_number(severity) elif self.logger.level: self.severity = self.logger.level self._safewrap_handlers() def _safewrap_handlers(self): for handler in self.logger.handlers: self._safewrap_handler(handler) def _safewrap_handler(self, handler: logging.Handler) -> None: # Make the logger handlers dump internal errors to # :data:`sys.__stderr__` instead of :data:`sys.stderr` to circumvent # infinite loops. class WithSafeHandleError(logging.Handler): def handleError(self, record: logging.LogRecord) -> None: try: traceback.print_exc(None, sys.__stderr__) except IOError: pass # see python issue 5971 handler.handleError = WithSafeHandleError().handleError
[docs] def write(self, data: Any) -> None: if not getattr(self._threadlocal, 'recurse_protection', False): data = data.strip() if data and not self.closed: self._threadlocal.recurse_protection = True try: self.logger.log(self.severity, data) finally: self._threadlocal.recurse_protection = False
[docs] def writelines(self, lines: Sequence[str]) -> None: for line in lines: self.write(line)
[docs] def flush(self) -> None: ...
[docs] def close(self) -> None: self.closed = True
[docs] def isatty(self) -> bool: return False
[docs]@contextmanager def redirect_stdouts(logger: Logger = redirect_logger, *, severity: Severity = None, stdout: bool = True, stderr: bool = True) -> ContextManager[FileLogProxy]: """Redirect :data:`sys.stdout` and :data:`sys.stdout` to logger.""" proxy = FileLogProxy(logger, severity=severity) if stdout: sys.stdout = proxy if stderr: sys.stderr = proxy try: yield proxy finally: if stdout: sys.stdout = sys.__stdout__ if stderr: sys.stderr = sys.__stderr__