Source code for mode.utils.futures

"""Async I/O Future utilities."""
import asyncio
from inspect import isawaitable
from typing import Any, Callable, Optional, Set, Type

# These used to be here, now moved to .queues
from .queues import FlowControlEvent, FlowControlQueue  # noqa: F401

try:  # pragma: no cover
    from asyncio import all_tasks
except ImportError:  # pragma: no cover
[docs] def all_tasks( loop: asyncio.AbstractEventLoop) -> Set[asyncio.Task]: # noqa return asyncio.Task.all_tasks(loop=loop)
try: # pragma: no cover from asyncio import current_task # type: ignore except ImportError: # pragma: no cover current_task = asyncio.Task.current_task __all__ = [ 'all_tasks', 'current_task', 'done_future', 'maybe_async', 'maybe_cancel', 'stampede', 'notify', ] class StampedeWrapper: fut: asyncio.Future = None def __init__(self, fun: Callable, *args: Any, loop: asyncio.AbstractEventLoop = None, **kwargs: Any) -> None: self.fun = fun self.args = args self.kwargs = kwargs self.loop = loop async def __call__(self) -> Any: await asyncio.sleep(0) fut = self.fut if fut is None: fut = asyncio.Future(loop=self.loop) if self.fut is None: self.fut = fut try: result = await self.fun(*self.args, **self.kwargs) fut.set_result(result) except asyncio.CancelledError: fut.cancel() raise finally: await asyncio.sleep(0) self.fut = None return result if fut.done(): return fut.result() return await fut
[docs]class stampede: """Descriptor for cached async operations providing stampede protection. See also thundering herd problem. Adding the decorator to an async callable method: Examples: Here's an example coroutine method connecting a network client: .. sourcecode:: python class Client: @stampede async def maybe_connect(self): await self._connect() async def _connect(self): return Connection() In the above example, if multiple coroutines call ``maybe_connect`` at the same time, then only one of them will actually perform the operation. The rest of the coroutines will wait for the result, and return it once the first caller returns. """ def __init__(self, fget: Callable, *, doc: str = None) -> None: self.__get = fget self.__doc__ = doc or fget.__doc__ self.__name__ = fget.__name__ self.__module__ = fget.__module__ def __get__(self, obj: Any, type: Type = None) -> Any: if obj is None: return self try: w = obj.__dict__[self.__name__] except KeyError: w = obj.__dict__[self.__name__] = StampedeWrapper(self.__get, obj) return w
[docs]def done_future(result: Any = None, *, loop: asyncio.AbstractEventLoop = None) -> asyncio.Future: """Return :class:`asyncio.Future` that is already evaluated.""" f = (loop or asyncio.get_event_loop()).create_future() f.set_result(result) return f
async def maybe_async(res: Any) -> Any: """Await future if argument is Awaitable. Examples: >>> await maybe_async(regular_function(arg)) >>> await maybe_async(async_function(arg)) """ if isawaitable(res): return await res return res
[docs]def maybe_cancel(fut: asyncio.Future) -> bool: """Cancel future if it is cancellable.""" if fut is not None and not fut.done(): return fut.cancel() return False
[docs]def notify(fut: Optional[asyncio.Future], result: Any = None) -> None: """Set :class:`asyncio.Future` result if future exists and is not done.""" # can be used to turn a Future into a lockless, single-consumer condition, # for multi-consumer use asyncio.Condition if fut is not None and not fut.done(): fut.set_result(result)