"""Traceback utilities."""
import asyncio
import inspect
import io
import sys
from traceback import StackSummary, print_list, walk_tb
from types import FrameType
from typing import (
Any,
Coroutine,
Generator,
IO,
Mapping,
Optional,
Union,
)
__all__ = [
'Traceback',
'format_task_stack',
'print_task_stack',
]
DEFAULT_MAX_FRAMES = sys.getrecursionlimit() // 8
[docs]def print_task_stack(task: asyncio.Task, *,
file: IO = sys.stderr,
limit: int = DEFAULT_MAX_FRAMES,
capture_locals: bool = False) -> None:
"""Print the stack trace for an :class:`asyncio.Task`."""
print(f'Stack for {task!r} (most recent call last):', file=file)
tb = Traceback.from_task(task, limit=limit)
print_list(
StackSummary.extract(
walk_tb(tb),
limit=limit,
capture_locals=capture_locals,
),
file=file,
)
class _CustomCode:
co_filename: str
co_name: str
def __init__(self, filename: str, name: str) -> None:
self.co_filename = filename
self.co_name = name
class _CustomFrame:
f_globals: Mapping[str, Any]
f_fileno: int
f_code: _CustomCode
def __init__(self,
globals: Mapping[str, Any],
fileno: int,
code: _CustomCode) -> None:
self.f_globals = globals
self.f_fileno = fileno
self.f_code = code
self.f_locals = {}
class _BaseTraceback:
tb_frame: FrameType
tb_lineno: int
tb_lasti: int
tb_next: Optional['Traceback']
class _Truncated(_BaseTraceback):
def __init__(self, filename='...', name='[rest of traceback truncated]'):
self.tb_lineno = -1
self.tb_frame = _CustomFrame(
globals={
'__file__': '',
'__name__': '',
'__loader__': None,
},
fileno=-1,
code=_CustomCode(
filename=filename,
name=name,
),
)
self.tb_next = None
self.tb_lasti = -1
[docs]class Traceback(_BaseTraceback):
"""Traceback object with truncated frames."""
def __init__(self,
frame: FrameType,
lineno: int = None,
lasti: int = None) -> None:
self.tb_frame = frame
self.tb_lineno = lineno if lineno is not None else frame.f_lineno
self.tb_lasti = lasti if lasti is not None else frame.f_lasti
self.tb_next = None
[docs] @classmethod
def from_task(cls, task: asyncio.Task, *,
limit: int = DEFAULT_MAX_FRAMES) -> 'Traceback':
coro = task._coro # type: ignore
return cls.from_coroutine(coro, limit=limit)
[docs] @classmethod
def from_coroutine(cls, coro: Union[Coroutine, Generator], *,
depth: int = 0,
limit: int = DEFAULT_MAX_FRAMES) -> 'Traceback':
try:
frame = cls._get_coroutine_frame(coro)
except AttributeError:
if type(coro).__name__ == 'async_generator_asend':
return _Truncated(filename='async_generator_asend')
raise
if limit is None:
limit = getattr(sys, 'tracebacklimit', None)
if limit is not None and limit < 0:
limit = 0
frames = []
num_frames = 0
current_frame = frame
while current_frame is not None:
if limit is not None:
if num_frames > limit:
break
frames.append(current_frame)
num_frames += 1
current_frame = current_frame.f_back
frames.reverse()
prev = None
root: Traceback = None
for f in frames:
tb = cls(f)
if root is None:
root = tb
if prev is not None:
prev.tb_next = tb
prev = tb
cr_await = cls._get_coroutine_next(coro)
if cr_await is not None and asyncio.iscoroutine(cr_await):
next_node: Traceback
if limit is not None and depth > limit:
next_node = _Truncated()
else:
next_node = cls.from_coroutine(
cr_await, limit=limit, depth=depth + 1)
if root is not None:
root.tb_next = next_node
else:
return next_node
return root
@staticmethod
def _get_coroutine_frame(coro: Union[Coroutine, Generator]) -> FrameType:
try:
if inspect.isgenerator(coro):
# is a @asyncio.coroutine wrapped generator
return coro.gi_frame
else:
# is an async def function
return coro.cr_frame
except AttributeError as exc:
raise AttributeError(
'WHAT IS THIS? str={0} repr={1!r} typ={2!r} dir={3}'.format(
coro, coro, type(coro), dir(coro))) from exc
@staticmethod
def _get_coroutine_next(coro: Union[Coroutine, Generator]) -> Any:
if inspect.isgenerator(coro):
# is a @asyncio.coroutine wrapped generator
return coro.gi_yieldfrom
else:
# is an async def function
return coro.cr_await