mode.utils.queues¶
Queue utilities - variations of asyncio.Queue.
-
class
mode.utils.queues.FlowControlEvent(*, initially_suspended: bool = True, loop: asyncio.events.AbstractEventLoop = None)¶ Manage flow control
FlowControlQueueinstances.The FlowControlEvent manages flow in one or many queue instances at the same time.
To flow control queues, first create the shared event:
>>> flow_control = FlowControlEvent()
Then pass that shared event to the queues that should be managed by it:
>>> q1 = FlowControlQueue(maxsize=1, flow_control=flow_control) >>> q2 = FlowControlQueue(flow_control=flow_control)
If you want the contents of the queue to be cleared when flow is resumed, then specify that by using the
clear_on_resumeflag:>>> q3 = FlowControlQueue(clear_on_resume=True, ... flow_control=flow_control)
To suspend production into queues, use
flow_control.suspend:>>> flow_control.suspend()
While the queues are suspend, any producer attempting to send something to the queue will hang until flow is resumed.
To resume production into queues, use
flow_control.resume:>>> flow_control.resume()
Notes
In Faust queues are managed by the
app.flow_controlevent.-
manage_queue(queue: mode.utils.queues.FlowControlQueue) → None¶ Add
FlowControlQueueto be cleared on resume.
-
suspend() → None¶ Suspend production into queues managed by this event.
-
resume() → None¶ Resume production into queues managed by this event.
-
is_active() → bool¶
-
clear() → None¶
-
async
acquire() → None¶ Wait until flow control is resumed.
-
-
class
mode.utils.queues.FlowControlQueue(maxsize: int = 0, *, flow_control: mode.utils.queues.FlowControlEvent, clear_on_resume: bool = False, **kwargs: Any)¶ asyncio.Queuemanaged byFlowControlEvent.See also
-
clear() → None¶
-
async
put(value: _T) → None¶ Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.
This method is a coroutine.
-
-
class
mode.utils.queues.ThrowableQueue(*args: Any, **kwargs: Any)¶ Queue that can be notified of errors.
-
async
get() → _T¶ Remove and return an item from the queue.
If queue is empty, wait until an item is available.
This method is a coroutine.
-
empty() → bool¶ Return True if the queue is empty, False otherwise.
-
clear() → None¶
-
get_nowait() → _T¶ Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
-
async
throw(exc: BaseException) → None¶
-
async