Source code for mode.utils.futures
"""Async I/O Future utilities."""
import asyncio
from inspect import isawaitable
from typing import Any, Callable, Optional, Set, Type
# These used to be here, now moved to .queues
from .queues import FlowControlEvent, FlowControlQueue # noqa: F401
try: # pragma: no cover
from asyncio import all_tasks
except ImportError: # pragma: no cover
[docs] def all_tasks(
loop: asyncio.AbstractEventLoop) -> Set[asyncio.Task]: # noqa
return asyncio.Task.all_tasks(loop=loop)
try: # pragma: no cover
from asyncio import current_task # type: ignore
except ImportError: # pragma: no cover
current_task = asyncio.Task.current_task
__all__ = [
'all_tasks',
'current_task',
'done_future',
'maybe_async',
'maybe_cancel',
'maybe_set_exception',
'maybe_set_result',
'stampede',
'notify',
]
class StampedeWrapper:
fut: asyncio.Future = None
def __init__(self,
fun: Callable,
*args: Any,
loop: asyncio.AbstractEventLoop = None,
**kwargs: Any) -> None:
self.fun = fun
self.args = args
self.kwargs = kwargs
self.loop = loop
async def __call__(self) -> Any:
await asyncio.sleep(0)
fut = self.fut
if fut is None:
fut = asyncio.Future(loop=self.loop)
self.fut = fut
try:
result = await self.fun(*self.args, **self.kwargs)
fut.set_result(result)
await asyncio.sleep(0)
except asyncio.CancelledError:
fut.cancel()
raise
finally:
await asyncio.sleep(0)
self.fut = None
return result
if fut.done():
return fut.result()
return await fut
[docs]class stampede:
"""Descriptor for cached async operations providing stampede protection.
See also thundering herd problem.
Adding the decorator to an async callable method:
Examples:
Here's an example coroutine method connecting a network client:
.. sourcecode:: python
class Client:
@stampede
async def maybe_connect(self):
await self._connect()
async def _connect(self):
return Connection()
In the above example, if multiple coroutines call ``maybe_connect``
at the same time, then only one of them will actually perform the
operation. The rest of the coroutines will wait for the result,
and return it once the first caller returns.
"""
def __init__(self, fget: Callable, *, doc: str = None) -> None:
self.__get = fget
self.__doc__ = doc or fget.__doc__
self.__name__ = fget.__name__
self.__module__ = fget.__module__
self.__wrapped__ = fget
def __call__(self, *args, **kwargs):
# here to support inspect.signature
raise NotImplementedError()
def __get__(self, obj: Any, type: Type = None) -> Any:
if obj is None:
return self
try:
w = obj.__dict__[self.__name__]
except KeyError:
w = obj.__dict__[self.__name__] = StampedeWrapper(self.__get, obj)
return w
[docs]def done_future(result: Any = None, *,
loop: asyncio.AbstractEventLoop = None) -> asyncio.Future:
"""Return :class:`asyncio.Future` that is already evaluated."""
f = (loop or asyncio.get_event_loop()).create_future()
f.set_result(result)
return f
[docs]async def maybe_async(res: Any) -> Any:
"""Await future if argument is Awaitable.
Examples:
>>> await maybe_async(regular_function(arg))
>>> await maybe_async(async_function(arg))
"""
if isawaitable(res):
return await res
return res
[docs]def maybe_cancel(fut: asyncio.Future) -> bool:
"""Cancel future if it is cancellable."""
if fut is not None and not fut.done():
return fut.cancel()
return False
[docs]def maybe_set_exception(fut: asyncio.Future, exc: BaseException) -> bool:
"""Set future exception if not already done."""
if fut is not None and not fut.done():
fut.set_exception(exc)
return True
return False
[docs]def maybe_set_result(fut: asyncio.Future, result: Any) -> bool:
"""Set future result if not already done."""
if fut is not None and not fut.done():
fut.set_result(result)
return True
return False
[docs]def notify(fut: Optional[asyncio.Future], result: Any = None) -> None:
"""Set :class:`asyncio.Future` result if future exists and is not done."""
# can be used to turn a Future into a lockless, single-consumer condition,
# for multi-consumer use asyncio.Condition
if fut is not None and not fut.done():
fut.set_result(result)