Source code for mode.utils.loops

"""Event loop utilities."""
import asyncio
from typing import Any, Callable

__all__ = ['clone_loop', 'call_asap']


def _is_unix_loop(loop: asyncio.AbstractEventLoop) -> bool:
    try:
        from asyncio import unix_events
    except ImportError:
        return False
    else:
        return isinstance(loop, unix_events._UnixSelectorEventLoop)


[docs]def clone_loop(loop: asyncio.AbstractEventLoop) -> asyncio.AbstractEventLoop: """Clone loop retaining signal handlers.""" new_loop = asyncio.new_event_loop() if _is_unix_loop(loop): for signum, handle in loop._signal_handlers.items(): new_loop.add_signal_handler( signum, _appropriate_signal_handler(loop, handle)) return new_loop
def _appropriate_signal_handler( parent_loop: asyncio.AbstractEventLoop, handle: asyncio.Handle) -> Callable: callback = handle._callback context = getattr(handle, '_context', None) # CPython 3.7+ callback_args = handle._args def _call_using_parent_loop() -> None: _call_asap(parent_loop, callback, *callback_args, context=context) return _call_using_parent_loop
[docs]def call_asap(callback: Callable, *args: Any, context: Any = None, loop: asyncio.AbstractEventLoop = None) -> asyncio.Handle: """Call function asap by pushing at the front of the line.""" assert loop if _is_unix_loop(loop): return _call_asap(loop, callback, *args, context=context) if context is not None: return loop.call_soon_threadsafe(callback, *args, context=context) return loop.call_soon_threadsafe(callback, *args)
def _call_asap(loop: Any, callback: Callable, *args: Any, context: Any = None) -> asyncio.Handle: loop._check_closed() if loop._debug: loop._check_callback(callback, 'call_soon_threadsafe') handle = loop._call_soon(callback, args, context) if context is not None: handle = asyncio.Handle(callback, args, loop, context) else: handle = asyncio.Handle(callback, args, loop) if handle._source_traceback: del handle._source_traceback[-1] loop._ready.insert(0, handle) if handle._source_traceback: del handle._source_traceback[-1] loop._write_to_self() return handle