Source code for mode.debug

"""Debugging utilities."""
import math
import signal
import traceback
from types import FrameType
from typing import Any, Type

from .services import Service
from .utils.logging import get_logger
from .utils.times import Seconds, want_seconds

__all__ = ['Blocking', 'BlockingDetector']

logger = get_logger(__name__)

if hasattr(signal, 'setitimer'):  # pragma: no cover
    def arm_alarm(seconds: float) -> None:
        signal.setitimer(signal.ITIMER_REAL, seconds)
else:  # pragma: no cover
    try:
        import itimer
    except ImportError:
        def arm_alarm(seconds: float) -> None:
            signal.alarm(math.ceil(seconds))
    else:
        def arm_alarm(seconds: float) -> None:
            itimer(seconds)


[docs]class Blocking(RuntimeError): """Exception raised when event loop is blocked."""
[docs]class BlockingDetector(Service): """Service that detects blocking code using alarm/itimer. Examples: blockdetect = BlockingDetector(timeout=10.0) await blockdetect.start() Keyword Arguments: timeout (Seconds): number of seconds that the event loop can be blocked. raises (Type[BaseException]): Exception to raise when the blocking timeout is exceeded. Defaults to :exc:`Blocking`. """ logger = logger def __init__(self, timeout: Seconds, raises: Type[BaseException] = Blocking, **kwargs: Any) -> None: self.timeout: float = want_seconds(timeout) self.raises: Type[BaseException] = raises super().__init__(**kwargs) @Service.task async def _deadman_switch(self) -> None: try: while not self.should_stop: self._reset_signal() await self.sleep(self.timeout * 0.96) finally: self._clear_signal() def _reset_signal(self) -> None: signal.signal(signal.SIGALRM, self._on_alarm) self._arm(self.timeout) def _clear_signal(self) -> None: self._arm(0) def _arm(self, timeout: float) -> None: arm_alarm(timeout) def _on_alarm(self, signum: int, frame: FrameType) -> None: msg = f'Blocking detected (timeout={self.timeout})' stack = ''.join(traceback.format_stack(frame)) self.log.warning('Blocking detected (timeout=%r) %s', self.timeout, stack) self._reset_signal() raise self.raises(msg)