mode

AsyncIO Service-based programming.

class mode.Service(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None)[source]

An asyncio service that can be started/stopped/restarted.

Keyword Arguments
abstract = False
class Diag(service: mode.types.services.ServiceT)

Service diagnostics.

This can be used to track what your service is doing. For example if your service is a Kafka consumer with a background thread that commits the offset every 30 seconds, you may want to see when this happens:

DIAG_COMMITTING = 'committing'

class Consumer(Service):

    @Service.task
    async def _background_commit(self) -> None:
        while not self.should_stop:
            await self.sleep(30.0)
            self.diag.set_flag(DIAG_COMITTING)
            try:
                await self._consumer.commit()
            finally:
                self.diag.unset_flag(DIAG_COMMITTING)

The above code is setting the flag manually, but you can also use a decorator to accomplish the same thing:

@Service.timer(30.0)
async def _background_commit(self) -> None:
    await self.commit()

@Service.transitions_with(DIAG_COMITTING)
async def commit(self) -> None:
    await self._consumer.commit()
set_flag(flag: str) → None
unset_flag(flag: str) → None
wait_for_shutdown = False

Set to True if .stop must wait for the shutdown flag to be set.

shutdown_timeout = 60.0

Time to wait for shutdown flag set before we give up.

restart_count = 0

Current number of times this service instance has been restarted.

mundane_level = 'info'

The log level for mundane info such as starting, stopping, etc. Set this to "debug" for less information.

classmethod from_awaitable(coro: Awaitable, *, name: str = None, **kwargs: Any) → mode.types.services.ServiceT[source]
classmethod task(fun: Callable[Any, Awaitable[None]]) → mode.services.ServiceTask[source]

Decorate function to be used as background task.

Example

>>> class S(Service):
...
...     @Service.task
...     async def background_task(self):
...         while not self.should_stop:
...             await self.sleep(1.0)
...             print('Waking up')
classmethod timer(interval: Union[datetime.timedelta, float, str]) → Callable[Callable[mode.types.services.ServiceT, Awaitable[None]], mode.services.ServiceTask][source]

Background timer executing every n seconds.

Example

>>> class S(Service):
...
...     @Service.timer(1.0)
...     async def background_timer(self):
...         print('Waking up')
classmethod transitions_to(flag: str) → Callable[source]

Decorate function to set and reset diagnostic flag.

async transition_with(flag: str, fut: Awaitable, *args: Any, **kwargs: Any) → Any[source]
add_dependency(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]

Add dependency to other service.

The service will be started/stopped with this service.

async add_runtime_dependency(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]
async remove_dependency(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]

Stop and remove dependency of this service.

async add_async_context(context: AsyncContextManager) → Any[source]
add_context(context: ContextManager) → Any[source]
add_future(coro: Awaitable) → _asyncio.Future[source]

Add relationship to asyncio.Future.

The future will be joined when this service is stopped.

on_init() → None[source]
on_init_dependencies() → Iterable[mode.types.services.ServiceT][source]

Return list of service dependencies for this service.

async join_services(services: Sequence[mode.types.services.ServiceT]) → None[source]
async sleep(n: Union[datetime.timedelta, float, str], *, loop: asyncio.events.AbstractEventLoop = None) → None[source]

Sleep for n seconds, or until service stopped.

async wait_for_stopped(*coros: Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event], timeout: Union[datetime.timedelta, float, str] = None) → bool[source]
async wait(*coros: Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event], timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResult[source]

Wait for coroutines to complete, or until the service stops.

async wait_many(coros: Iterable[Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event]], *, timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResult[source]
async wait_first(*coros: Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event], timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResults[source]
async start() → None[source]
async maybe_start() → None[source]

Start the service, if it has not already been started.

async crash(reason: BaseException) → None[source]

Crash the service and all child services.

async stop() → None[source]

Stop the service.

async restart() → None[source]

Restart this service.

service_reset() → None[source]
async wait_until_stopped() → None[source]

Wait until the service is signalled to stop.

set_shutdown() → None[source]

Set the shutdown signal.

Notes

If wait_for_shutdown is set, stopping the service will wait for this flag to be set.

itertimer(interval: Union[datetime.timedelta, float, str], *, max_drift_correction: float = 0.1, loop: asyncio.events.AbstractEventLoop = None, sleep: Callable[..., Awaitable] = None, clock: Callable[float] = <built-in function perf_counter>, name: str = '') → AsyncIterator[float][source]

Sleep interval seconds for every iteration.

This is an async iterator that takes advantage of timer_intervals() to act as a timer that stop drift from occurring, and adds a tiny amount of drift to timers so that they don’t start at the same time.

Uses Service.sleep which will bail-out-quick if the service is stopped.

Note

Will sleep the full interval seconds before returning from first iteration.

Examples

>>> async for sleep_time in self.itertimer(1.0):
...   print('another second passed, just woke up...')
...   await perform_some_http_request()
property started

Return True if the service was started.

property crashed
property should_stop

Return True if the service must stop.

property state

Service state - as a human readable string.

property label

Label used for graphs.

property shortlabel

Label used for logging.

property beacon

Beacon used to track services in a dependency graph.

logger = <Logger mode.services (WARNING)>
property crash_reason
mode.task(fun: Callable[Any, Awaitable[None]]) → mode.services.ServiceTask

Decorate function to be used as background task.

Example

>>> class S(Service):
...
...     @Service.task
...     async def background_task(self):
...         while not self.should_stop:
...             await self.sleep(1.0)
...             print('Waking up')
mode.timer(interval: Union[datetime.timedelta, float, str]) → Callable[Callable[mode.types.services.ServiceT, Awaitable[None]], mode.services.ServiceTask]

Background timer executing every n seconds.

Example

>>> class S(Service):
...
...     @Service.timer(1.0)
...     async def background_timer(self):
...         print('Waking up')
class mode.BaseSignal(*, name: str = None, owner: Type = None, loop: asyncio.events.AbstractEventLoop = None, default_sender: Any = None, receivers: MutableSet[Any] = None, filter_receivers: MutableMapping[Any, MutableSet[Any]] = None)[source]

Base class for signal/observer pattern.

asdict() → Mapping[str, Any][source]
clone(**kwargs: Any) → mode.types.signals.BaseSignalT[source]
with_default_sender(sender: Any = None) → mode.types.signals.BaseSignalT[source]
unpack_sender_from_args(*args: Any) → Tuple[T, Tuple[Any, ...]][source]
connect(fun: Union[Callable[[T, Any, BaseSignalT, Any], None], Callable[[T, Any, BaseSignalT, Any], Awaitable[None]]] = None, **kwargs: Any) → Callable[source]
disconnect(fun: Union[Callable[[T, Any, BaseSignalT, Any], None], Callable[[T, Any, BaseSignalT, Any], Awaitable[None]]], *, weak: bool = False, sender: Any = None) → None[source]
iter_receivers(sender: T_contra) → Iterable[Union[Callable[[T, Any, mode.types.signals.BaseSignalT, Any], None], Callable[[T, Any, mode.types.signals.BaseSignalT, Any], Awaitable[None]]]][source]
property ident
property label
class mode.Signal(*, name: str = None, owner: Type = None, loop: asyncio.events.AbstractEventLoop = None, default_sender: Any = None, receivers: MutableSet[Any] = None, filter_receivers: MutableMapping[Any, MutableSet[Any]] = None)[source]

Asynchronous signal (using async def functions).

async send(*args: Any, **kwargs: Any) → None[source]
clone(**kwargs: Any) → mode.types.signals.SignalT[source]
with_default_sender(sender: Any = None) → mode.types.signals.SignalT[source]
class mode.SyncSignal(*, name: str = None, owner: Type = None, loop: asyncio.events.AbstractEventLoop = None, default_sender: Any = None, receivers: MutableSet[Any] = None, filter_receivers: MutableMapping[Any, MutableSet[Any]] = None)[source]

Signal that is synchronous (using regular def functions).

send(*args: Any, **kwargs: Any) → None[source]
clone(**kwargs: Any) → mode.types.signals.SyncSignalT[source]
with_default_sender(sender: Any = None) → mode.types.signals.SyncSignalT[source]
class mode.ForfeitOneForAllSupervisor(*services: mode.types.services.ServiceT, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = <class 'mode.exceptions.MaxRestartsExceeded'>, replacement: Callable[[mode.types.services.ServiceT, int], Awaitable[mode.types.services.ServiceT]] = None, **kwargs: Any)[source]

If one service in the group crashes, we give up on all of them.

logger = <Logger mode.supervisors (WARNING)>
async restart_services(services: List[mode.types.services.ServiceT]) → None[source]
class mode.ForfeitOneForOneSupervisor(*services: mode.types.services.ServiceT, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = <class 'mode.exceptions.MaxRestartsExceeded'>, replacement: Callable[[mode.types.services.ServiceT, int], Awaitable[mode.types.services.ServiceT]] = None, **kwargs: Any)[source]

Supervisor that if a service crashes, we do not restart it.

async restart_services(services: List[mode.types.services.ServiceT]) → None[source]
logger = <Logger mode.supervisors (WARNING)>
class mode.OneForAllSupervisor(*services: mode.types.services.ServiceT, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = <class 'mode.exceptions.MaxRestartsExceeded'>, replacement: Callable[[mode.types.services.ServiceT, int], Awaitable[mode.types.services.ServiceT]] = None, **kwargs: Any)[source]

Supervisor that restarts all services when a service crashes.

async restart_services(services: List[mode.types.services.ServiceT]) → None[source]
logger = <Logger mode.supervisors (WARNING)>
class mode.OneForOneSupervisor(*services: mode.types.services.ServiceT, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = <class 'mode.exceptions.MaxRestartsExceeded'>, replacement: Callable[[mode.types.services.ServiceT, int], Awaitable[mode.types.services.ServiceT]] = None, **kwargs: Any)[source]

Supervisor simply restarts any crashed service.

logger = <Logger mode.supervisors (WARNING)>
class mode.SupervisorStrategy(*services: mode.types.services.ServiceT, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = <class 'mode.exceptions.MaxRestartsExceeded'>, replacement: Callable[[mode.types.services.ServiceT, int], Awaitable[mode.types.services.ServiceT]] = None, **kwargs: Any)[source]

Base class for all supervisor strategies.

wakeup() → None[source]
add(*services: mode.types.services.ServiceT) → None[source]
discard(*services: mode.types.services.ServiceT) → None[source]
insert(index: int, service: mode.types.services.ServiceT) → None[source]
service_operational(service: mode.types.services.ServiceT) → bool[source]
async run_until_complete() → None[source]
async on_start() → None[source]

Service is starting.

async on_stop() → None[source]

Service is being stopped/restarted.

async start_services(services: List[mode.types.services.ServiceT]) → None[source]
async start_service(service: mode.types.services.ServiceT) → None[source]
async restart_services(services: List[mode.types.services.ServiceT]) → None[source]
async stop_services(services: List[mode.types.services.ServiceT]) → None[source]
async restart_service(service: mode.types.services.ServiceT) → None[source]
logger = <Logger mode.supervisors (WARNING)>
class mode.CrashingSupervisor(*services: mode.types.services.ServiceT, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = <class 'mode.exceptions.MaxRestartsExceeded'>, replacement: Callable[[mode.types.services.ServiceT, int], Awaitable[mode.types.services.ServiceT]] = None, **kwargs: Any)[source]

Supervisor that crashes the whole program.

logger = <Logger mode.supervisors (WARNING)>
wakeup() → None[source]
class mode.ServiceT(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None)[source]

Abstract type for an asynchronous service that can be started/stopped.

See also

mode.Service.

wait_for_shutdown = False
restart_count = 0
supervisor = None
abstract add_dependency(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]
abstract async add_runtime_dependency(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]
abstract async add_async_context(context: AsyncContextManager) → Any[source]
abstract add_context(context: ContextManager) → Any[source]
abstract async start() → None[source]
abstract async maybe_start() → None[source]
abstract async crash(reason: BaseException) → None[source]
abstract async stop() → None[source]
abstract service_reset() → None[source]
abstract async restart() → None[source]
abstract async wait_until_stopped() → None[source]
abstract set_shutdown() → None[source]
abstract property started
abstract property crashed
abstract property should_stop
abstract property state
abstract property label
abstract property shortlabel
property beacon
abstract property loop
abstract property crash_reason
class mode.BaseSignalT(*, name: str = None, owner: Type = None, loop: asyncio.events.AbstractEventLoop = None, default_sender: Any = None, receivers: MutableSet[Any] = None, filter_receivers: MutableMapping[Any, MutableSet[Any]] = None)[source]

Base type for all signals.

abstract clone(**kwargs: Any) → mode.types.signals.BaseSignalT[source]
abstract with_default_sender(sender: Any = None) → mode.types.signals.BaseSignalT[source]
abstract connect(fun: Union[Callable[[T, Any, BaseSignalT, Any], None], Callable[[T, Any, BaseSignalT, Any], Awaitable[None]]], **kwargs: Any) → Callable[source]
abstract disconnect(fun: Union[Callable[[T, Any, BaseSignalT, Any], None], Callable[[T, Any, BaseSignalT, Any], Awaitable[None]]], *, sender: Any = None, weak: bool = True) → None[source]
class mode.SignalT(*, name: str = None, owner: Type = None, loop: asyncio.events.AbstractEventLoop = None, default_sender: Any = None, receivers: MutableSet[Any] = None, filter_receivers: MutableMapping[Any, MutableSet[Any]] = None)[source]

Base class for all async signals (using async def).

abstract async send(sender: T_contra, *args: Any, **kwargs: Any) → None[source]
abstract clone(**kwargs: Any) → SignalT[source]
abstract with_default_sender(sender: Any = None) → SignalT[source]
class mode.SyncSignalT(*, name: str = None, owner: Type = None, loop: asyncio.events.AbstractEventLoop = None, default_sender: Any = None, receivers: MutableSet[Any] = None, filter_receivers: MutableMapping[Any, MutableSet[Any]] = None)[source]

Base class for all synchronous signals (using regular def).

abstract send(sender: T_contra, *args: Any, **kwargs: Any) → None[source]
abstract clone(**kwargs: Any) → SyncSignalT[source]
abstract with_default_sender(sender: Any = None) → SyncSignalT[source]
class mode.SupervisorStrategyT(*services: mode.types.supervisors.ServiceT, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = None, replacement: Callable[[mode.types.supervisors.ServiceT, int], Awaitable[mode.types.supervisors.ServiceT]] = None, **kwargs: Any)[source]

Base type for all supervisor strategies.

abstract wakeup() → None[source]
abstract add(*services: mode.types.supervisors.ServiceT) → None[source]
abstract discard(*services: mode.types.supervisors.ServiceT) → None[source]
abstract service_operational(service: mode.types.supervisors.ServiceT) → bool[source]
abstract async restart_service(service: mode.types.supervisors.ServiceT) → None[source]
mode.want_seconds(s: float) → float[source]

Convert Seconds to float.

class mode.flight_recorder(logger: Any, *, timeout: Union[datetime.timedelta, float, str], loop: asyncio.events.AbstractEventLoop = None)[source]

Flight Recorder context for use with with statement.

This is a logging utility to log stuff only when something times out.

For example if you have a background thread that is sometimes hanging:

class RedisCache(mode.Service):

    @mode.timer(1.0)
    def _background_refresh(self) -> None:
        self._users = await self.redis_client.get(USER_KEY)
        self._posts = await self.redis_client.get(POSTS_KEY)

You want to figure out on what line this is hanging, but logging all the time will provide way too much output, and will even change how fast the program runs and that can mask race conditions, so that they never happen.

Use the flight recorder to save the logs and only log when it times out:

logger = mode.get_logger(__name__)

class RedisCache(mode.Service):

    @mode.timer(1.0)
    def _background_refresh(self) -> None:
        with mode.flight_recorder(logger, timeout=10.0) as on_timeout:
            on_timeout.info(f'+redis_client.get({USER_KEY!r})')
            await self.redis_client.get(USER_KEY)
            on_timeout.info(f'-redis_client.get({USER_KEY!r})')

            on_timeout.info(f'+redis_client.get({POSTS_KEY!r})')
            await self.redis_client.get(POSTS_KEY)
            on_timeout.info(f'-redis_client.get({POSTS_KEY!r})')

If the body of this with statement completes before the timeout, the logs are forgotten about and never emitted – if it takes more than ten seconds to complete, we will see these messages in the log:

[2018-04-19 09:43:55,877: WARNING]: Warning: Task timed out!
[2018-04-19 09:43:55,878: WARNING]:
    Please make sure it is hanging before restarting.
[2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1]
    (started at Thu Apr 19 09:43:45 2018) Replaying logs...
[2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1]
    (Thu Apr 19 09:43:45 2018) +redis_client.get('user')
[2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1]
    (Thu Apr 19 09:43:49 2018) -redis_client.get('user')
[2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1]
    (Thu Apr 19 09:43:46 2018) +redis_client.get('posts')
[2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1] -End of log-

Now we know this redis_client.get call can take too long to complete, and should consider adding a timeout to it.

wrap_debug(obj: Any) → mode.utils.logging.Logwrapped[source]
wrap_info(obj: Any) → mode.utils.logging.Logwrapped[source]
wrap_warn(obj: Any) → mode.utils.logging.Logwrapped[source]
wrap_error(obj: Any) → mode.utils.logging.Logwrapped[source]
wrap(severity: int, obj: Any) → mode.utils.logging.Logwrapped[source]
activate() → None[source]
cancel() → None[source]
log(severity: int, message: str, *args: Any, **kwargs: Any) → None[source]
mode.get_logger(name: str) → logging.Logger[source]

Get logger by name.

mode.setup_logging(*, loglevel: Union[str, int] = None, logfile: Union[str, IO] = None, loghandlers: List[logging.StreamHandler] = None, logging_config: Dict = None) → int[source]

Configure logging subsystem.

mode.label(s: Any) → str[source]

Return the name of an object as string.

mode.shortlabel(s: Any) → str[source]

Return the shortened name of an object as string.

class mode.Worker(*services: mode.types.services.ServiceT, debug: bool = False, quiet: bool = False, logging_config: Dict = None, loglevel: Union[str, int] = None, logfile: Union[str, IO] = None, redirect_stdouts: bool = True, redirect_stdouts_level: Union[int, str] = None, stdout: IO = <_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>, stderr: IO = <_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'>, console_port: int = 50101, loghandlers: List[logging.StreamHandler] = None, blocking_timeout: Union[datetime.timedelta, float, str] = 10.0, loop: asyncio.events.AbstractEventLoop = None, override_logging: bool = True, daemon: bool = True, **kwargs: Any)[source]

Start mode service from the command-line.

BLOCK_DETECTOR = 'mode.debug:BlockingDetector'
say(msg: str) → None[source]

Write message to standard out.

carp(msg: str) → None[source]

Write warning to standard err.

on_init_dependencies() → Iterable[mode.types.services.ServiceT][source]

Return list of service dependencies for this service.

async on_first_start() → None[source]

Service started for the first time in this process.

async default_on_first_start() → None[source]
async on_execute() → None[source]
on_setup_root_logger(logger: logging.Logger, level: int) → None[source]
async maybe_start_blockdetection() → None[source]
install_signal_handlers() → None[source]
logger = <Logger mode.worker (WARNING)>
execute_from_commandline() → NoReturn[source]
on_worker_shutdown() → None[source]
stop_and_shutdown() → None[source]
async on_started() → None[source]

Service has started.

property blocking_detector