mode.utils.queues
¶
Queue utilities - variations of asyncio.Queue
.
-
class
mode.utils.queues.
FlowControlEvent
(*, initially_suspended: bool = True, loop: asyncio.events.AbstractEventLoop = None)¶ Manage flow control
FlowControlQueue
instances.The FlowControlEvent manages flow in one or many queue instances at the same time.
To flow control queues, first create the shared event:
>>> flow_control = FlowControlEvent()
Then pass that shared event to the queues that should be managed by it:
>>> q1 = FlowControlQueue(maxsize=1, flow_control=flow_control) >>> q2 = FlowControlQueue(flow_control=flow_control)
If you want the contents of the queue to be cleared when flow is resumed, then specify that by using the
clear_on_resume
flag:>>> q3 = FlowControlQueue(clear_on_resume=True, ... flow_control=flow_control)
To suspend production into queues, use
flow_control.suspend
:>>> flow_control.suspend()
While the queues are suspend, any producer attempting to send something to the queue will hang until flow is resumed.
To resume production into queues, use
flow_control.resume
:>>> flow_control.resume()
Notes
In Faust queues are managed by the
app.flow_control
event.-
manage_queue
(queue: mode.utils.queues.FlowControlQueue) → None¶ Add
FlowControlQueue
to be cleared on resume.
-
suspend
() → None¶ Suspend production into queues managed by this event.
-
resume
() → None¶ Resume production into queues managed by this event.
-
is_active
() → bool¶
-
clear
() → None¶
-
async
acquire
() → None¶ Wait until flow control is resumed.
-
-
class
mode.utils.queues.
FlowControlQueue
(maxsize: int = 0, *, flow_control: mode.utils.queues.FlowControlEvent, clear_on_resume: bool = False, **kwargs: Any)¶ asyncio.Queue
managed byFlowControlEvent
.See also
-
clear
() → None¶
-
async
put
(value: _T) → None¶ Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.
This method is a coroutine.
-
-
class
mode.utils.queues.
ThrowableQueue
(*args: Any, **kwargs: Any)¶ Queue that can be notified of errors.
-
async
get
() → _T¶ Remove and return an item from the queue.
If queue is empty, wait until an item is available.
This method is a coroutine.
-
empty
() → bool¶ Return True if the queue is empty, False otherwise.
-
clear
() → None¶
-
get_nowait
() → _T¶ Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
-
async
throw
(exc: BaseException) → None¶
-
async