Source code for mode.utils.queues

"""Queue utilities - variations of :class:`asyncio.Queue`."""
import asyncio
import typing
from collections import deque
from typing import Any, TypeVar
from weakref import WeakSet
from .locks import Event
from .typing import Deque

_T = TypeVar('_T')


[docs]class FlowControlEvent: """Manage flow control :class:`FlowControlQueue` instances. The FlowControlEvent manages flow in one or many queue instances at the same time. To flow control queues, first create the shared event:: >>> flow_control = FlowControlEvent() Then pass that shared event to the queues that should be managed by it:: >>> q1 = FlowControlQueue(maxsize=1, flow_control=flow_control) >>> q2 = FlowControlQueue(flow_control=flow_control) If you want the contents of the queue to be cleared when flow is resumed, then specify that by using the ``clear_on_resume`` flag:: >>> q3 = FlowControlQueue(clear_on_resume=True, ... flow_control=flow_control) To suspend production into queues, use ``flow_control.suspend``:: >>> flow_control.suspend() While the queues are suspend, any producer attempting to send something to the queue will hang until flow is resumed. To resume production into queues, use ``flow_control.resume``:: >>> flow_control.resume() Notes: In Faust queues are managed by the ``app.flow_control`` event. """ if typing.TYPE_CHECKING: _queues: WeakSet['FlowControlQueue'] _queues = None def __init__(self, *, initially_suspended: bool = True, loop: asyncio.AbstractEventLoop = None) -> None: self.loop = loop self._resume = Event(loop=self.loop) self._suspend = Event(loop=self.loop) if initially_suspended: self._suspend.set() self._queues = WeakSet()
[docs] def manage_queue(self, queue: 'FlowControlQueue') -> None: """Add :class:`FlowControlQueue` to be cleared on resume.""" self._queues.add(queue)
[docs] def suspend(self) -> None: """Suspend production into queues managed by this event.""" self._resume.clear() self._suspend.set()
[docs] def resume(self) -> None: """Resume production into queues managed by this event.""" self._suspend.clear() self._resume.set() self.clear()
[docs] def is_active(self) -> bool: return not self._suspend.is_set()
[docs] def clear(self) -> None: for queue in self._queues: queue.clear()
[docs] async def acquire(self) -> None: """Wait until flow control is resumed.""" if self._suspend.is_set(): await self._resume.wait()
[docs]class FlowControlQueue(asyncio.Queue): """:class:`asyncio.Queue` managed by :class:`FlowControlEvent`. See Also: :class:`FlowControlEvent`. """ def __init__(self, maxsize: int = 0, *, flow_control: FlowControlEvent = None, clear_on_resume: bool = False, **kwargs: Any) -> None: self._flow_control = flow_control self._clear_on_resume = clear_on_resume if self._clear_on_resume: self._flow_control.manage_queue(self) super().__init__(maxsize, **kwargs)
[docs] def clear(self) -> None: self._queue.clear() # type: ignore
[docs] async def put(self, value: _T) -> None: # type: ignore await self._flow_control.acquire() await super().put(value)
[docs]class ThrowableQueue(FlowControlQueue): """Queue that can be notified of errors.""" def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self._errors: Deque[BaseException] = deque()
[docs] @typing.no_type_check async def get(self) -> _T: if self._errors: raise self._errors.popleft() return await super().get()
[docs] def empty(self) -> bool: return super().empty() and not self._errors
[docs] def clear(self) -> None: self._queue.clear() # type: ignore self._errors.clear() for putter in self._putters: putter.cancel() self._putters.clear()
[docs] def get_nowait(self) -> _T: if self._errors: raise self._errors.popleft() return super().get_nowait()
[docs] async def throw(self, exc: BaseException) -> None: self._throw(exc)
def _throw(self, exc: BaseException) -> None: waiters = self._getters # type: ignore while waiters: waiter = waiters.popleft() if not waiter.done(): waiter.set_exception(exc) break else: self._errors.append(exc)