"""Logging utilities."""
import asyncio
import logging
import logging.config
import os
import sys
import threading
import traceback
from contextlib import contextmanager
from functools import singledispatch, wraps
from itertools import count
from logging import Logger
from pprint import pprint
from time import asctime
from types import TracebackType
from typing import (
Any,
Callable,
ClassVar,
ContextManager,
Dict,
IO,
Iterable,
List,
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
Tuple,
Type,
Union,
)
from .futures import all_tasks, current_task
from .text import title
from .times import Seconds, want_seconds
from .tracebacks import format_task_stack, print_task_stack
import colorlog
__all__ = [
'CompositeLogger',
'ExtensionFormatter',
'FileLogProxy',
'FormatterHandler',
'LogSeverityMixin',
'Logwrapped',
'Severity',
'cry',
'flight_recorder',
'formatter',
'get_logger',
'level_name',
'level_number',
'redirect_logger',
'redirect_stdouts',
'setup_logging',
]
DEVLOG: bool = bool(os.environ.get('DEVLOG', ''))
DEFAULT_FORMAT: str = '''
[%(asctime)s: %(levelname)s]: %(message)s %(extra)s
'''.strip()
DEFAULT_COLOR_FORMAT = '''
[%(asctime)s: %(levelname)s]: %(log_color)s%(message)s %(extra)s
'''.strip()
DEFAULT_COLORS = {
**colorlog.default_log_colors,
'INFO': 'white',
'DEBUG': 'blue',
}
DEFAULT_FORMATTERS = {
'default': {
'()': 'mode.utils.logging.DefaultFormatter',
'format': DEFAULT_FORMAT,
},
'colored': {
'()': 'mode.utils.logging.ExtensionFormatter',
'format': DEFAULT_COLOR_FORMAT,
'log_colors': DEFAULT_COLORS,
'stream': sys.stdout,
},
}
def _logger_config(handlers: List[str],
level: Union[str, int] = 'INFO') -> Dict:
return {
'handlers': handlers,
'level': level,
}
def create_logconfig(version: int = 1,
disable_existing_loggers: bool = False,
formatters: Dict = DEFAULT_FORMATTERS,
handlers: Dict = None,
root: Dict = None) -> Dict:
return {
'version': version,
# do not disable existing loggers from other modules.
# see https://www.caktusgroup.com/blog/2015/01/27/
# Django-Logging-Configuration-logging_config-default-settings-logger/
'disable_existing_loggers': disable_existing_loggers,
'formatters': formatters,
'handlers': handlers,
'root': root,
}
#: Set by ``setup_logging`` if logging target file is a TTY.
LOG_ISATTY: bool = False
FormatterHandler = Callable[[Any], Any]
Severity = Union[int, str]
_formatter_registry: Set[FormatterHandler] = set()
[docs]def get_logger(name: str) -> Logger:
"""Get logger by name."""
logger = logging.getLogger(name)
if not logger.handlers:
logger.addHandler(logging.NullHandler())
return logger
redirect_logger = get_logger('mode.redirect')
[docs]class LogSeverityMixin:
"""Mixin class that delegates standard logging methods to logger.
The class that mixes in this class must define the ``log`` method.
Example:
>>> class Foo(LogSeverityMixin):
...
... logger = get_logger('foo')
...
... def log(self,
... severity: int,
... msg: str,
... *args: Any, **kwargs: Any) -> None:
... return self.logger.log(severity, msg, *args, **kwargs)
"""
[docs] def dev(self, msg: str, *args: Any, **kwargs: Any) -> None:
if DEVLOG:
self.info(msg, *args, **kwargs)
[docs] def debug(self, msg: str, *args: Any, **kwargs: Any) -> None:
self.log(logging.DEBUG, msg, *args, **kwargs)
[docs] def info(self, msg: str, *args: Any, **kwargs: Any) -> None:
self.log(logging.INFO, msg, *args, **kwargs)
[docs] def warn(self, msg: str, *args: Any, **kwargs: Any) -> None:
self.log(logging.WARN, msg, *args, **kwargs)
[docs] def warning(self, msg: str, *args: Any, **kwargs: Any) -> None:
self.log(logging.WARN, msg, *args, **kwargs)
[docs] def error(self, msg: str, *args: Any, **kwargs: Any) -> None:
self.log(logging.ERROR, msg, *args, **kwargs)
[docs] def crit(self, msg: str, *args: Any, **kwargs: Any) -> None:
self.log(logging.CRITICAL, msg, *args, **kwargs)
[docs] def critical(self, msg: str, *args: Any, **kwargs: Any) -> None:
self.log(logging.CRITICAL, msg, *args, **kwargs)
[docs] def exception(self, msg: str, *args: Any, **kwargs: Any) -> None:
self.log(logging.ERROR, msg, *args, exc_info=1, **kwargs)
[docs]class CompositeLogger(LogSeverityMixin):
"""Composite logger for classes.
The class can be used as both mixin and composite,
and may also define a ``.formatter`` attribute
which will reformat any log messages sent.
Service uses this to add logging methods:
.. sourcecode:: python
class Service(ServiceT):
log: CompositeLogger
def __init__(self):
self.log = CompositeLogger(
logger=self.logger,
formatter=self._format_log,
)
def _format_log(self, severity: int, msg: str,
*args: Any, **kwargs: Any) -> str:
return (f'[^{"-" * (self.beacon.depth - 1)}'
f'{self.shortlabel}]: {msg}')
This means those defining a service may also use it to log:
.. sourcecode:: pycon
>>> service.log.info('Something happened')
and when logging additional information about the service is automatically
included.
"""
logger: Logger
def __init__(self,
logger: Logger,
formatter: Callable[..., str] = None) -> None:
self.logger = logger
self.formatter: Callable[..., str] = formatter
[docs] def log(self, severity: int, msg: str,
*args: Any, **kwargs: Any) -> None:
self.logger.log(severity,
self.format(severity, msg, *args, **kwargs),
*args, **kwargs)
def _format_extra(record: logging.LogRecord) -> str:
return ', '.join(
f'{k}={v!r}' for k, v in record.__dict__.get('data', {}).items()
)
class DefaultFormatter(logging.Formatter):
"""Default formatter adds support for extra data."""
def format(self, record: logging.LogRecord) -> str:
record.extra = _format_extra(record)
return super().format(record)
[docs]@singledispatch
def level_name(loglevel: int) -> str:
"""Convert log level to number."""
return logging.getLevelName(loglevel)
@level_name.register(str)
def _when_str(loglevel: str) -> str:
return loglevel.upper()
[docs]@singledispatch
def level_number(loglevel: int) -> int:
"""Convert log level number to name."""
return loglevel
@level_number.register(str)
def _(loglevel: str) -> int:
return logging.getLevelName(loglevel.upper()) # type: ignore
[docs]def setup_logging(
*,
loglevel: Union[str, int] = None,
logfile: Union[str, IO] = None,
loghandlers: List[logging.StreamHandler] = None,
logging_config: Dict = None) -> int:
"""Configure logging subsystem."""
stream: IO = None
_loglevel: int = level_number(loglevel)
if not isinstance(logfile, str):
stream, logfile = logfile, None
if stream is None:
stream = sys.stdout
global LOG_ISATTY
try:
LOG_ISATTY = stream.isatty()
except AttributeError:
pass
logging.root.handlers.clear()
_setup_logging(
level=_loglevel,
filename=logfile,
stream=stream,
logging_config=logging_config,
loghandlers=loghandlers,
)
return _loglevel
def _setup_logging(*,
level: Union[int, str] = None,
filename: str = None,
stream: IO = None,
loghandlers: List[logging.StreamHandler] = None,
logging_config: Dict = None) -> None:
handlers = {}
if filename:
assert stream is None
handlers.update({
'default': {
'level': level,
'class': 'logging.FileHandler',
'formatter': 'default',
'filename': filename,
},
})
elif stream:
handlers.update({
'default': {
'level': level,
'class': 'colorlog.StreamHandler',
'formatter': 'colored',
},
})
config = create_logconfig(handlers=handlers, root={
'level': level,
'handlers': ['default'],
})
if logging_config is None:
logging_config = config
elif logging_config.pop('merge', False):
logging_config = {**config, **logging_config}
for k in ('formatters', 'filters', 'handlers', 'loggers', 'root'):
logging_config[k] = {**config.get(k, {}),
**logging_config.get(k, {})}
logging.config.dictConfig(logging_config)
if loghandlers is not None:
logging.root.handlers.extend(loghandlers)
[docs]class Logwrapped(object):
"""Wrap all object methods, to log on call."""
obj: Any
logger: Any
severity: int
ident: str
_ignore: ClassVar[Set[str]] = {'__enter__', '__exit__'}
def __init__(self,
obj: Any,
logger: Any = None,
severity: Severity = None,
ident: str = '') -> None:
self.obj = obj
self.logger = logger
self.severity = level_number(severity) if severity else severity
self.ident = ident
def __getattr__(self, key: str) -> Any:
meth = getattr(self.obj, key)
ignore = object.__getattribute__(self, '_ignore')
if not callable(meth) or key in ignore:
return meth
@wraps(meth)
def __wrapped(*args: Any, **kwargs: Any) -> Any:
info = ''
if self.ident:
info += self.ident.format(self.obj)
info += f'{meth.__name__}('
if args:
info += ', '.join(map(repr, args))
if kwargs:
if args:
info += ', '
info += ', '.join(f'{key}={value!r}'
for key, value in kwargs.items())
info += ')'
self.logger.log(self.severity, info)
return meth(*args, **kwargs)
return __wrapped
def __repr__(self):
return repr(self.obj)
def __dir__(self):
return dir(self.obj)
[docs]def cry(file: IO,
*,
sep1: str = '=',
sep2: str = '-',
sep3: str = '~',
seplen: int = 49) -> None: # pragma: no cover
"""Return stack-trace of all active threads.
See Also:
Taken from https://gist.github.com/737056.
"""
# get a map of threads by their ID so we can print their names
# during the traceback dump
tmap = {t.ident: t for t in threading.enumerate()}
current_thread = threading.current_thread()
sep1 = sep1 * seplen if len(sep1) == 1 else sep1
sep2 = sep2 * seplen if len(sep2) == 1 else sep2
sep3 = sep3 * seplen if len(sep3) == 1 else sep3
for tid, frame in sys._current_frames().items():
thread = tmap.get(tid)
if thread:
if thread.ident == current_thread.ident:
loop = asyncio.get_event_loop()
else:
loop = getattr(thread, 'loop', None)
print(f'THREAD {thread.name}', file=file) # noqa: T003
print(sep1, file=file) # noqa: T003
traceback.print_stack(frame, file=file)
print(sep2, file=file) # noqa: T003
print('LOCAL VARIABLES', file=file) # noqa: T003
print(sep2, file=file) # noqa: T003
pprint(frame.f_locals, stream=file)
if loop is not None:
print('TASKS', file=file)
print(sep2, file=file)
for task in all_tasks(loop=loop):
print_task_name(task, file=file)
print(f' {sep3}', file=file)
print_task_stack(task, file=file, capture_locals=True)
print('\n', file=file) # noqa: T003
def print_task_name(task: asyncio.Task, file: IO) -> None:
"""Print name of :class:`asyncio.Task` in tracebacks."""
coro = task._coro # type: ignore
wrapped = getattr(task, '__wrapped__', None)
coro_name = getattr(coro, '__name__', None)
if coro_name is None:
# some coroutines does not have a __name__ attribute
# e.g. async_generator_asend
coro_name = repr(coro)
print(f' TASK {coro_name}', file=file)
if wrapped:
print(f' -> {wrapped}', file=file)
print(f' {task!r}', file=file)
class LogMessage(NamedTuple):
"""Archived log message."""
severity: int
message: str
asctime: str
args: Tuple[Any, ...]
kwargs: Dict[str, Any]
[docs]class flight_recorder(ContextManager, LogSeverityMixin):
"""Flight Recorder context for use with :keyword:`with` statement.
This is a logging utility to log stuff only when something
times out.
For example if you have a background thread that is sometimes
hanging::
class RedisCache(mode.Service):
@mode.timer(1.0)
def _background_refresh(self) -> None:
self._users = await self.redis_client.get(USER_KEY)
self._posts = await self.redis_client.get(POSTS_KEY)
You want to figure out on what line this is hanging, but logging
all the time will provide way too much output, and will even change
how fast the program runs and that can mask race conditions, so that
they never happen.
Use the flight recorder to save the logs and only log when it times out:
.. sourcecode:: python
logger = mode.get_logger(__name__)
class RedisCache(mode.Service):
@mode.timer(1.0)
def _background_refresh(self) -> None:
with mode.flight_recorder(logger, timeout=10.0) as on_timeout:
on_timeout.info(f'+redis_client.get({USER_KEY!r})')
await self.redis_client.get(USER_KEY)
on_timeout.info(f'-redis_client.get({USER_KEY!r})')
on_timeout.info(f'+redis_client.get({POSTS_KEY!r})')
await self.redis_client.get(POSTS_KEY)
on_timeout.info(f'-redis_client.get({POSTS_KEY!r})')
If the body of this :keyword:`with` statement completes before the
timeout, the logs are forgotten about and never emitted -- if it
takes more than ten seconds to complete, we will see these messages
in the log:
.. sourcecode:: text
[2018-04-19 09:43:55,877: WARNING]: Warning: Task timed out!
[2018-04-19 09:43:55,878: WARNING]:
Please make sure it is hanging before restarting.
[2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1]
(started at Thu Apr 19 09:43:45 2018) Replaying logs...
[2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1]
(Thu Apr 19 09:43:45 2018) +redis_client.get('user')
[2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1]
(Thu Apr 19 09:43:49 2018) -redis_client.get('user')
[2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1]
(Thu Apr 19 09:43:46 2018) +redis_client.get('posts')
[2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1] -End of log-
Now we know this ``redis_client.get`` call can take too long to complete,
and should consider adding a timeout to it.
"""
_id_source: ClassVar[Iterable[int]] = count(1)
logger: Any
timeout: float
loop: asyncio.AbstractEventLoop
started_at_date: str
enabled_by: Optional[asyncio.Task]
_fut: asyncio.Future
_logs: List[Tuple[int, str, Tuple[Any], Dict[str, Any]]]
def __init__(self, logger: Any, *,
timeout: Seconds,
loop: asyncio.AbstractEventLoop = None) -> None:
self.id = next(self._id_source)
self.logger = logger
self.timeout = want_seconds(timeout)
self.loop = loop or asyncio.get_event_loop()
self.started_at_date = None
self.enabled_by = None
self._fut = None
self._logs = []
[docs] def wrap_debug(self, obj: Any) -> Logwrapped:
return self.wrap(logging.DEBUG, obj)
[docs] def wrap_info(self, obj: Any) -> Logwrapped:
return self.wrap(logging.INFO, obj)
[docs] def wrap_warn(self, obj: Any) -> Logwrapped:
return self.wrap(logging.WARN, obj)
[docs] def wrap_error(self, obj: Any) -> Logwrapped:
return self.wrap(logging.ERROR, obj)
[docs] def wrap(self, severity: int, obj: Any) -> Logwrapped:
return Logwrapped(logger=self, severity=severity, obj=obj)
[docs] def activate(self) -> None:
if self._fut:
raise RuntimeError('{type(self).__name__} already activated')
self.enabled_by = current_task()
self.started_at_date = asctime()
self._fut = asyncio.ensure_future(self._waiting(), loop=self.loop)
[docs] def cancel(self) -> None:
fut, self._fut = self._fut, None
self._logs.clear()
if fut is not None:
fut.cancel()
[docs] def log(self, severity: int, message: str,
*args: Any, **kwargs: Any) -> None:
if self._fut:
self._buffer_log(severity, message, args, kwargs)
else:
self.logger.log(severity, message, *args, **kwargs)
def _buffer_log(self, severity: int, message: str,
args: Any, kwargs: Any) -> None:
log = LogMessage(severity, message, asctime(), args, kwargs)
self._logs.append(log)
async def _waiting(self) -> None:
try:
await asyncio.sleep(self.timeout)
except asyncio.CancelledError:
pass
else:
try:
logger = self.logger
ident = self._ident()
logger.warning('Warning: Task timed out!')
logger.warning(
"Please make sure it's hanging before restart.")
logger.info('[%s] (started at %s) Replaying logs...',
ident, self.started_at_date)
if self._logs:
for sev, msg, datestr, args, kwargs in self._logs:
logger.log(
sev, f'[%s] (%s) {msg}', ident, datestr,
*args, **kwargs)
logger.info('[%s] -End of log-', ident)
logger.info('[%s] Task traceback', ident)
if self.enabled_by is not None:
logger.info(format_task_stack(self.enabled_by))
else:
logger.info('[%s] -missing-: not enabled by task')
except Exception as exc:
logger.exception('Flight recorder internal error: %r', exc)
raise
def _ident(self) -> str:
return f'{title(type(self).__name__)}-{self.id}'
def __repr__(self) -> str:
return f'<{self._ident()} @{id(self):#x}>'
def __enter__(self) -> 'flight_recorder':
self.activate()
return self
def __exit__(self,
exc_type: Type[BaseException] = None,
exc_val: BaseException = None,
exc_tb: TracebackType = None) -> Optional[bool]:
self.cancel()
[docs]class FileLogProxy:
"""File-like object that forwards data to logger."""
mode: str = 'w'
name: str = None
closed: bool = False
severity: int = logging.WARN
_threadlocal: threading.local = threading.local()
def __init__(self, logger: Logger, *, severity: Severity = None) -> None:
self.logger = logger
if severity:
self.severity = level_number(severity)
elif self.logger.level:
self.severity = self.logger.level
self._safewrap_handlers()
def _safewrap_handlers(self):
for handler in self.logger.handlers:
self._safewrap_handler(handler)
def _safewrap_handler(self, handler: logging.Handler) -> None:
# Make the logger handlers dump internal errors to
# :data:`sys.__stderr__` instead of :data:`sys.stderr` to circumvent
# infinite loops.
class WithSafeHandleError(logging.Handler):
def handleError(self, record: logging.LogRecord) -> None:
try:
traceback.print_exc(None, sys.__stderr__)
except IOError:
pass # see python issue 5971
handler.handleError = WithSafeHandleError().handleError
[docs] def write(self, data: Any) -> None:
if not getattr(self._threadlocal, 'recurse_protection', False):
data = data.strip()
if data and not self.closed:
self._threadlocal.recurse_protection = True
try:
self.logger.log(self.severity, data)
finally:
self._threadlocal.recurse_protection = False
[docs] def writelines(self, lines: Sequence[str]) -> None:
for line in lines:
self.write(line)
[docs] def flush(self) -> None:
...
[docs] def close(self) -> None:
self.closed = True
[docs] def isatty(self) -> bool:
return False
[docs]@contextmanager
def redirect_stdouts(logger: Logger = redirect_logger, *,
severity: Severity = None,
stdout: bool = True,
stderr: bool = True) -> ContextManager[FileLogProxy]:
"""Redirect :data:`sys.stdout` and :data:`sys.stdout` to logger."""
proxy = FileLogProxy(logger, severity=severity)
if stdout:
sys.stdout = proxy
if stderr:
sys.stderr = proxy
try:
yield proxy
finally:
if stdout:
sys.stdout = sys.__stdout__
if stderr:
sys.stderr = sys.__stderr__