Source code for mode.utils.times

"""Time, date and timezone related utilities."""
import abc
import asyncio
import sys
import time
from datetime import timedelta
from functools import singledispatch
from types import TracebackType
from typing import Callable, List, Mapping, NamedTuple, Optional, Type, Union

from .text import pluralize
from .typing import AsyncContextManager

__all__ = [
    'Bucket',
    'Seconds',
    'TokenBucket',
    'rate',
    'rate_limit',
    'want_seconds',
]

TIME_MONOTONIC: Callable[[], float]
if sys.platform == 'win32':
    TIME_MONOTONIC = time.time
else:
    TIME_MONOTONIC = time.monotonic

#: Seconds can be expressed as float or :class:`~datetime.timedelta`,
Seconds = Union[timedelta, float, str]


class Unit(NamedTuple):
    name: str
    value: float
    format: Callable[[float], str]  # noqa: E701


TIME_UNITS: List[Unit] = [
    Unit('day', 60 * 60 * 24.0, lambda n: format(n, '.2f')),
    Unit('hour', 60 * 60.0, lambda n: format(n, '.2f')),
    Unit('minute', 60.0, lambda n: format(n, '.2f')),
    Unit('second', 1.0, lambda n: format(n, '.2f')),
]

#: What the characters in a "rate" string means.
#: E.g. 8/s is "eight in one second"
RATE_MODIFIER_MAP: Mapping[str, Callable[[float], float]] = {
    's': lambda n: n,
    'm': lambda n: n / 60.0,
    'h': lambda n: n / 60.0 / 60.0,
    'd': lambda n: n / 60.0 / 60.0 / 24,
}


[docs]class Bucket(AsyncContextManager): """Rate limiting state. A bucket "pours" tokens at a rate of ``rate`` per second (or over'). Calling `bucket.pour()`, pours one token by default, and returns :const:`True` if that amount can be poured now, or :const:`False` if the caller has to wait. If this returns :const:`False`, it's prudent to either sleep or raise an exception:: if not bucket.pour(): await asyncio.sleep(bucket.expected_time()) If you want to consume multiple tokens in one go then specify the number:: if not bucket.pour(10): await asyncio.sleep(bucket.expected_time(10)) This class can also be used as an async. context manager, but in that case can only consume one tokens at a time:: async with bucket: # do something By default the async. context manager will suspend the current coroutine and sleep until as soon as the time that a token can be consumed. If you wish you can also raise an exception, instead of sleeping, by providing the ``raises`` keyword argument:: # hundred tokens in one second, and async with: raises TimeoutError class MyError(Exception): pass bucket = Bucket(100, over=1.0, raises=MyError) async with bucket: # do something """ rate: float fill_rate: float capacity: float _tokens: float def __init__(self, rate: Seconds, over: Seconds = 1.0, *, fill_rate: Seconds = None, capacity: Seconds = None, raises: Type[BaseException] = None, loop: asyncio.AbstractEventLoop = None) -> None: self.rate = want_seconds(rate) self.capacity = want_seconds(over) self.raises = raises self.loop = loop self._tokens = self.capacity self.__post_init__() def __post_init__(self) -> None: ...
[docs] @abc.abstractmethod def pour(self, tokens: int = 1) -> bool: ...
[docs] @abc.abstractmethod def expected_time(self, tokens: int = 1) -> float: ...
@property @abc.abstractmethod def tokens(self) -> float: ... @property def fill_rate(self) -> float: #: Defaults to rate! If you want the bucket to fill up #: faster/slower, then just override this. return self.rate async def __aenter__(self) -> 'Bucket': if not self.pour(): if self.raises: raise self.raises() expected_time = self.expected_time() await asyncio.sleep(expected_time, loop=self.loop) return self async def __aexit__(self, exc_type: Type[BaseException] = None, exc_val: BaseException = None, exc_tb: TracebackType = None) -> Optional[bool]: return None
[docs]class TokenBucket(Bucket): """Rate limiting using the token bucket algorithm.""" _tokens: float _last_pour: float def __post_init__(self) -> None: self._last_pour = TIME_MONOTONIC()
[docs] def pour(self, tokens: int = 1) -> bool: need = tokens have = self.tokens if have < need: return False self._tokens -= tokens return True
[docs] def expected_time(self, tokens: int = 1) -> float: have = self._tokens need = max(tokens, have) time_left = (need - have) / self.fill_rate return max(time_left, 0.0)
@property def tokens(self) -> float: now = TIME_MONOTONIC() if now < self._last_pour: return self._tokens if self._tokens < self.capacity: delta = self.fill_rate * (now - self._last_pour) self._tokens = min(self.capacity, self._tokens + delta) self._last_pour = now return self._tokens
[docs]@singledispatch def rate(r: float) -> float: """Convert rate string (`"100/m"`, `"2/h"` or `"0.5/s"`) to seconds.""" return r
@rate.register(str) def _(r: str) -> float: # noqa: F811 ops, _, modifier = r.partition('/') return RATE_MODIFIER_MAP[modifier or 's'](float(ops)) or 0 @rate.register(int) # noqa: F811 def _(r: int) -> float: return float(r) @rate.register(type(None)) # noqa: F811 def _(r: type(None)) -> float: return 0.0
[docs]def rate_limit(rate: float, over: Seconds = 1.0, *, bucket_type: Type[Bucket] = TokenBucket, raises: Type[BaseException] = None, loop: asyncio.AbstractEventLoop = None) -> Bucket: """Create rate limiting manager.""" return bucket_type(rate, over, raises=raises, loop=loop)
[docs]@singledispatch def want_seconds(s: float) -> float: """Convert :data:`Seconds` to float.""" return s
@want_seconds.register(str) # noqa: F811 def _(s: str) -> float: return rate(s) @want_seconds.register(timedelta) # noqa: F811 def _(s: timedelta) -> float: return s.total_seconds() def humanize_seconds(secs: float, *, prefix: str = '', sep: str = '', now: str = 'now', microseconds: bool = False) -> str: """Show seconds in human form. For example, 60 becomes "1 minute", and 7200 becomes "2 hours". Arguments: prefix (str): can be used to add a preposition to the output (e.g., 'in' will give 'in 1 second', but add nothing to 'now'). now (str): Literal 'now'. microseconds (bool): Include microseconds. """ secs = float(format(float(secs), '.2f')) for unit, divider, formatter in TIME_UNITS: if secs >= divider: w = secs / float(divider) return '{0}{1}{2} {3}'.format(prefix, sep, formatter(w), pluralize(w, unit)) if microseconds and secs > 0.0: return '{prefix}{sep}{0:.2f} seconds'.format( secs, sep=sep, prefix=prefix) return now