mode.utils.times

Time, date and timezone related utilities.

mode.utils.times.Seconds = typing.Union[datetime.timedelta, float, str]

Seconds can be expressed as float or timedelta,

class mode.utils.times.Bucket(rate: Union[datetime.timedelta, float, str], over: Union[datetime.timedelta, float, str] = 1.0, *, fill_rate: Union[datetime.timedelta, float, str] = None, capacity: Union[datetime.timedelta, float, str] = None, raises: Type[BaseException] = None, loop: asyncio.events.AbstractEventLoop = None)[source]

Rate limiting state.

A bucket “pours” tokens at a rate of rate per second (or over’).

Calling bucket.pour(), pours one token by default, and returns True if that amount can be poured now, or False if the caller has to wait.

If this returns False, it’s prudent to either sleep or raise an exception:

if not bucket.pour():
    await asyncio.sleep(bucket.expected_time())

If you want to consume multiple tokens in one go then specify the number:

if not bucket.pour(10):
    await asyncio.sleep(bucket.expected_time(10))

This class can also be used as an async. context manager, but in that case can only consume one tokens at a time:

async with bucket:
    # do something

By default the async. context manager will suspend the current coroutine and sleep until as soon as the time that a token can be consumed.

If you wish you can also raise an exception, instead of sleeping, by providing the raises keyword argument:

# hundred tokens in one second, and async with: raises TimeoutError

class MyError(Exception):
    pass

bucket = Bucket(100, over=1.0, raises=MyError)

async with bucket:
    # do something
abstract pour(tokens: int = 1) → bool[source]
abstract expected_time(tokens: int = 1) → float[source]
abstract property tokens
property fill_rate
class mode.utils.times.TokenBucket(rate: Union[datetime.timedelta, float, str], over: Union[datetime.timedelta, float, str] = 1.0, *, fill_rate: Union[datetime.timedelta, float, str] = None, capacity: Union[datetime.timedelta, float, str] = None, raises: Type[BaseException] = None, loop: asyncio.events.AbstractEventLoop = None)[source]

Rate limiting using the token bucket algorithm.

pour(tokens: int = 1) → bool[source]
expected_time(tokens: int = 1) → float[source]
property tokens
mode.utils.times.rate(r: float) → float[source]

Convert rate string (“100/m”, “2/h” or “0.5/s”) to seconds.

mode.utils.times.rate_limit(rate: float, over: Union[datetime.timedelta, float, str] = 1.0, *, bucket_type: Type[mode.utils.times.Bucket] = mode.utils.times.TokenBucket, raises: Type[BaseException] = None, loop: asyncio.events.AbstractEventLoop = None) → mode.utils.times.Bucket[source]

Create rate limiting manager.

mode.utils.times.want_seconds(s: float) → float[source]

Convert Seconds to float.