Source code for mode.utils.tracebacks

"""Traceback utilities."""
import asyncio
import inspect
import io
import sys
from traceback import StackSummary, print_list, walk_tb
from types import FrameType
from typing import (
    Any,
    Coroutine,
    Generator,
    IO,
    Mapping,
    Optional,
    Union,
)

__all__ = [
    'Traceback',
    'format_task_stack',
    'print_task_stack',
]

DEFAULT_MAX_FRAMES = sys.getrecursionlimit() // 8





[docs]def format_task_stack(task: asyncio.Task, *, limit: int = DEFAULT_MAX_FRAMES, capture_locals: bool = False) -> None: """Format :class:`asyncio.Task` stack trace as a string.""" f = io.StringIO() print_task_stack(task, file=f, limit=limit, capture_locals=capture_locals) return f.getvalue()
class _CustomCode: co_filename: str co_name: str def __init__(self, filename: str, name: str) -> None: self.co_filename = filename self.co_name = name class _CustomFrame: f_globals: Mapping[str, Any] f_fileno: int f_code: _CustomCode def __init__(self, globals: Mapping[str, Any], fileno: int, code: _CustomCode) -> None: self.f_globals = globals self.f_fileno = fileno self.f_code = code self.f_locals = {} class _BaseTraceback: tb_frame: FrameType tb_lineno: int tb_lasti: int tb_next: Optional['Traceback'] class _Truncated(_BaseTraceback): def __init__(self, filename='...', name='[rest of traceback truncated]'): self.tb_lineno = -1 self.tb_frame = _CustomFrame( globals={ '__file__': '', '__name__': '', '__loader__': None, }, fileno=-1, code=_CustomCode( filename=filename, name=name, ), ) self.tb_next = None self.tb_lasti = -1
[docs]class Traceback(_BaseTraceback): """Traceback object with truncated frames.""" def __init__(self, frame: FrameType, lineno: int = None, lasti: int = None) -> None: self.tb_frame = frame self.tb_lineno = lineno if lineno is not None else frame.f_lineno self.tb_lasti = lasti if lasti is not None else frame.f_lasti self.tb_next = None
[docs] @classmethod def from_task(cls, task: asyncio.Task, *, limit: int = DEFAULT_MAX_FRAMES) -> 'Traceback': coro = task._coro # type: ignore return cls.from_coroutine(coro, limit=limit)
[docs] @classmethod def from_coroutine(cls, coro: Union[Coroutine, Generator], *, depth: int = 0, limit: int = DEFAULT_MAX_FRAMES) -> 'Traceback': try: frame = cls._get_coroutine_frame(coro) except AttributeError: if type(coro).__name__ == 'async_generator_asend': return _Truncated(filename='async_generator_asend') raise if limit is None: limit = getattr(sys, 'tracebacklimit', None) if limit is not None and limit < 0: limit = 0 frames = [] num_frames = 0 current_frame = frame while current_frame is not None: if limit is not None: if num_frames > limit: break frames.append(current_frame) num_frames += 1 current_frame = current_frame.f_back frames.reverse() prev = None root: Traceback = None for f in frames: tb = cls(f) if root is None: root = tb if prev is not None: prev.tb_next = tb prev = tb cr_await = cls._get_coroutine_next(coro) if cr_await is not None and asyncio.iscoroutine(cr_await): next_node: Traceback if limit is not None and depth > limit: next_node = _Truncated() else: next_node = cls.from_coroutine( cr_await, limit=limit, depth=depth + 1) if root is not None: root.tb_next = next_node else: return next_node return root
@staticmethod def _get_coroutine_frame(coro: Union[Coroutine, Generator]) -> FrameType: try: if inspect.isgenerator(coro): # is a @asyncio.coroutine wrapped generator return coro.gi_frame else: # is an async def function return coro.cr_frame except AttributeError as exc: raise AttributeError( 'WHAT IS THIS? str={0} repr={1!r} typ={2!r} dir={3}'.format( coro, coro, type(coro), dir(coro))) from exc @staticmethod def _get_coroutine_next(coro: Union[Coroutine, Generator]) -> Any: if inspect.isgenerator(coro): # is a @asyncio.coroutine wrapped generator return coro.gi_yieldfrom else: # is an async def function return coro.cr_await