mode.utils.queues

Queue utilities - variations of asyncio.Queue.

class mode.utils.queues.FlowControlEvent(*, initially_suspended: bool = True, loop: asyncio.events.AbstractEventLoop = None)

Manage flow control FlowControlQueue instances.

The FlowControlEvent manages flow in one or many queue instances at the same time.

To flow control queues, first create the shared event:

>>> flow_control = FlowControlEvent()

Then pass that shared event to the queues that should be managed by it:

>>> q1 = FlowControlQueue(maxsize=1, flow_control=flow_control)
>>> q2 = FlowControlQueue(flow_control=flow_control)

If you want the contents of the queue to be cleared when flow is resumed, then specify that by using the clear_on_resume flag:

>>> q3 = FlowControlQueue(clear_on_resume=True,
...                       flow_control=flow_control)

To suspend production into queues, use flow_control.suspend:

>>> flow_control.suspend()

While the queues are suspend, any producer attempting to send something to the queue will hang until flow is resumed.

To resume production into queues, use flow_control.resume:

>>> flow_control.resume()

Notes

In Faust queues are managed by the app.flow_control event.

manage_queue(queue: mode.utils.queues.FlowControlQueue) → None

Add FlowControlQueue to be cleared on resume.

suspend() → None

Suspend production into queues managed by this event.

resume() → None

Resume production into queues managed by this event.

is_active() → bool
clear() → None
async acquire() → None

Wait until flow control is resumed.

class mode.utils.queues.FlowControlQueue(maxsize: int = 0, *, flow_control: mode.utils.queues.FlowControlEvent, clear_on_resume: bool = False, **kwargs: Any)

asyncio.Queue managed by FlowControlEvent.

See also

FlowControlEvent.

clear() → None
async put(value: _T) → None

Put an item into the queue.

Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.

This method is a coroutine.

class mode.utils.queues.ThrowableQueue(*args: Any, **kwargs: Any)

Queue that can be notified of errors.

async get() → _T

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

This method is a coroutine.

empty() → bool

Return True if the queue is empty, False otherwise.

clear() → None
get_nowait() → _T

Remove and return an item from the queue.

Return an item if one is immediately available, else raise QueueEmpty.

async throw(exc: BaseException) → None