mode

AsyncIO Service-based programming.

class mode.Service(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None)[source]

An asyncio service that can be started/stopped/restarted.

Keyword Arguments:
 
abstract = False
class Diag(service: mode.types.services.ServiceT)

Service diagnostics.

This can be used to track what your service is doing. For example if your service is a Kafka consumer with a background thread that commits the offset every 30 seconds, you may want to see when this happens:

DIAG_COMMITTING = 'committing'

class Consumer(Service):

    @Service.task
    async def _background_commit(self) -> None:
        while not self.should_stop:
            await self.sleep(30.0)
            self.diag.set_flag(DIAG_COMITTING)
            try:
                await self._consumer.commit()
            finally:
                self.diag.unset_flag(DIAG_COMMITTING)

The above code is setting the flag manually, but you can also use a decorator to accomplish the same thing:

@Service.timer(30.0)
async def _background_commit(self) -> None:
    await self.commit()

@Service.transitions_with(DIAG_COMITTING)
async def commit(self) -> None:
    await self._consumer.commit()
set_flag(flag: str) → None
unset_flag(flag: str) → None
wait_for_shutdown = False

Set to True if .stop must wait for the shutdown flag to be set.

shutdown_timeout = 60.0

Time to wait for shutdown flag set before we give up.

restart_count = 0

Current number of times this service instance has been restarted.

mundane_level = 'info'

The log level for mundane info such as starting, stopping, etc. Set this to "debug" for less information.

classmethod from_awaitable(coro: Awaitable, *, name: str = None, **kwargs) → mode.types.services.ServiceT[source]
classmethod task(fun: Callable[Any, Awaitable[None]]) → mode.services.ServiceTask[source]

Decorator used to define a service background task.

Example

>>> class S(Service):
...
...     @Service.task
...     async def background_task(self):
...         while not self.should_stop:
...             await self.sleep(1.0)
...             print('Waking up')
classmethod timer(interval: Union[datetime.timedelta, float, str]) → Callable[Callable[mode.types.services.ServiceT, Awaitable[None]], mode.services.ServiceTask][source]

A background timer that executes every n seconds.

Example

>>> class S(Service):
...
...     @Service.timer(1.0)
...     async def background_timer(self):
...         print('Waking up')
classmethod transitions_to(flag: str) → Callable[source]

Decorator that adds diagnostic flag while function is running.

add_dependency(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]

Add dependency to other service.

The service will be started/stopped with this service.

add_context(context: ContextManager) → Any[source]
add_future(coro: Awaitable) → _asyncio.Future[source]

Add relationship to asyncio.Future.

The future will be joined when this service is stopped.

on_init() → None[source]
on_init_dependencies() → Iterable[mode.types.services.ServiceT][source]

Callback to be used to add service dependencies.

coroutine add_async_context(context: AsyncContextManager) → Any[source]
coroutine add_runtime_dependency(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]
coroutine crash(reason: BaseException) → None[source]

Crash the service and all child services.

coroutine join_services(services: Sequence[mode.types.services.ServiceT]) → None[source]
logger = <Logger mode.services (WARNING)>
coroutine maybe_start() → None[source]

Start the service, if it has not already been started.

coroutine restart() → None[source]

Restart this service.

service_reset() → None[source]
coroutine sleep(n: Union[datetime.timedelta, float, str]) → None[source]

Sleep for n seconds, or until service stopped.

coroutine start() → None[source]
coroutine stop() → None[source]

Stop the service.

coroutine transition_with(flag: str, fut: Awaitable, *args, **kwargs) → Any[source]
coroutine wait(*coros, timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResult[source]

Wait for coroutines to complete, or until the service stops.

coroutine wait_first(*coros, timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResults[source]
coroutine wait_for_stopped(*coros, timeout: Union[datetime.timedelta, float, str] = None) → bool[source]
coroutine wait_many(coros: Iterable[Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event]], *, timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResult[source]
coroutine wait_until_stopped() → None[source]

Wait until the service is signalled to stop.

set_shutdown() → None[source]

Set the shutdown signal.

Notes

If wait_for_shutdown is set, stopping the service will wait for this flag to be set.

started

Was the service started?

crashed
should_stop

Should the service stop ASAP?

state

Current service state - as a human readable string.

label

Label used for graphs.

shortlabel

Label used for logging.

beacon

Beacon used to track services in a dependency graph.

class mode.BaseSignal(*, name: str = None, owner: Type = None, loop: asyncio.events.AbstractEventLoop = None, default_sender: Any = None, receivers: MutableSet[Any] = None, filter_receivers: MutableMapping[Any, MutableSet[Any]] = None)[source]
asdict() → Mapping[str, Any][source]
clone(**kwargs) → mode.types.signals.BaseSignalT[source]
with_default_sender(sender: Any = None) → mode.types.signals.BaseSignalT[source]
unpack_sender_from_args(*args) → Tuple[T, Tuple[Any, ...]][source]
connect(fun: Union[Callable[[T, Any, BaseSignalT, Any], None], Callable[[T, Any, BaseSignalT, Any], Awaitable[None]]] = None, **kwargs) → Callable[source]
disconnect(fun: Union[Callable[[T, Any, BaseSignalT, Any], None], Callable[[T, Any, BaseSignalT, Any], Awaitable[None]]], *, weak: bool = True, sender: Any = None) → None[source]
iter_receivers(sender: T_contra) → Iterable[Union[Callable[[T, Any, mode.types.signals.BaseSignalT, Any], None], Callable[[T, Any, mode.types.signals.BaseSignalT, Any], Awaitable[None]]]][source]
ident
class mode.Signal(*, name: str = None, owner: Type = None, loop: asyncio.events.AbstractEventLoop = None, default_sender: Any = None, receivers: MutableSet[Any] = None, filter_receivers: MutableMapping[Any, MutableSet[Any]] = None)[source]
clone(**kwargs) → mode.types.signals.SignalT[source]
with_default_sender(sender: Any = None) → mode.types.signals.SignalT[source]
coroutine send(*args, **kwargs) → None[source]
class mode.SyncSignal(*, name: str = None, owner: Type = None, loop: asyncio.events.AbstractEventLoop = None, default_sender: Any = None, receivers: MutableSet[Any] = None, filter_receivers: MutableMapping[Any, MutableSet[Any]] = None)[source]
send(*args, **kwargs) → None[source]
clone(**kwargs) → mode.types.signals.SyncSignalT[source]
with_default_sender(sender: Any = None) → mode.types.signals.SyncSignalT[source]
class mode.ForfeitOneForAllSupervisor(*services, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = <class 'mode.exceptions.MaxRestartsExceeded'>, replacement: Callable[[mode.types.services.ServiceT, int], Awaitable[mode.types.services.ServiceT]] = None, **kwargs)[source]

If one service in the group crashes, we give up on all of them.

logger = <Logger mode.supervisors (WARNING)>
coroutine restart_services(services: List[mode.types.services.ServiceT]) → None[source]
class mode.ForfeitOneForOneSupervisor(*services, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = <class 'mode.exceptions.MaxRestartsExceeded'>, replacement: Callable[[mode.types.services.ServiceT, int], Awaitable[mode.types.services.ServiceT]] = None, **kwargs)[source]

Supervisor that if a service crashes, we do not restart it.

logger = <Logger mode.supervisors (WARNING)>
coroutine restart_services(services: List[mode.types.services.ServiceT]) → None[source]
class mode.OneForAllSupervisor(*services, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = <class 'mode.exceptions.MaxRestartsExceeded'>, replacement: Callable[[mode.types.services.ServiceT, int], Awaitable[mode.types.services.ServiceT]] = None, **kwargs)[source]
logger = <Logger mode.supervisors (WARNING)>
coroutine restart_services(services: List[mode.types.services.ServiceT]) → None[source]
class mode.OneForOneSupervisor(*services, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = <class 'mode.exceptions.MaxRestartsExceeded'>, replacement: Callable[[mode.types.services.ServiceT, int], Awaitable[mode.types.services.ServiceT]] = None, **kwargs)[source]
logger = <Logger mode.supervisors (WARNING)>
class mode.SupervisorStrategy(*services, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = <class 'mode.exceptions.MaxRestartsExceeded'>, replacement: Callable[[mode.types.services.ServiceT, int], Awaitable[mode.types.services.ServiceT]] = None, **kwargs)[source]
wakeup() → None[source]
add(*services) → None[source]
discard(*services) → None[source]
insert(index: int, service: mode.types.services.ServiceT) → None[source]
service_operational(service: mode.types.services.ServiceT) → bool[source]
logger = <Logger mode.supervisors (WARNING)>
coroutine on_start() → None[source]

Called every time before the service is started/restarted.

coroutine on_stop() → None[source]

Called every time before the service is stopped/restarted.

coroutine restart_service(service: mode.types.services.ServiceT) → None[source]
coroutine restart_services(services: List[mode.types.services.ServiceT]) → None[source]
coroutine run_until_complete() → None[source]
coroutine start_service(service: mode.types.services.ServiceT) → None[source]
coroutine start_services(services: List[mode.types.services.ServiceT]) → None[source]
coroutine stop_services(services: List[mode.types.services.ServiceT]) → None[source]
class mode.CrashingSupervisor(*services, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = <class 'mode.exceptions.MaxRestartsExceeded'>, replacement: Callable[[mode.types.services.ServiceT, int], Awaitable[mode.types.services.ServiceT]] = None, **kwargs)[source]
logger = <Logger mode.supervisors (WARNING)>
wakeup() → None[source]
class mode.ServiceT(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None)[source]

Abstract type for an asynchronous service that can be started/stopped.

See also

mode.Service.

wait_for_shutdown = False
restart_count = 0
supervisor = None
add_dependency(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]
add_context(context: ContextManager) → Any[source]
service_reset() → None[source]
set_shutdown() → None[source]
started
crashed
should_stop
state
label
shortlabel
beacon
coroutine add_async_context(context: AsyncContextManager) → Any[source]
coroutine add_runtime_dependency(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]
coroutine crash(reason: BaseException) → None[source]
coroutine maybe_start() → None[source]
coroutine restart() → None[source]
coroutine start() → None[source]
coroutine stop() → None[source]
coroutine wait_until_stopped() → None[source]
loop
class mode.BaseSignalT(*, name: str = None, owner: Type = None, loop: asyncio.events.AbstractEventLoop = None, default_sender: Any = None, receivers: MutableSet[Any] = None, filter_receivers: MutableMapping[Any, MutableSet[Any]] = None)[source]
clone(**kwargs) → mode.types.signals.BaseSignalT[source]
with_default_sender(sender: Any = None) → mode.types.signals.BaseSignalT[source]
connect(fun: Union[Callable[[T, Any, BaseSignalT, Any], None], Callable[[T, Any, BaseSignalT, Any], Awaitable[None]]], **kwargs) → Callable[source]
disconnect(fun: Union[Callable[[T, Any, BaseSignalT, Any], None], Callable[[T, Any, BaseSignalT, Any], Awaitable[None]]], *, sender: Any = None, weak: bool = True) → None[source]
class mode.SignalT(*, name: str = None, owner: Type = None, loop: asyncio.events.AbstractEventLoop = None, default_sender: Any = None, receivers: MutableSet[Any] = None, filter_receivers: MutableMapping[Any, MutableSet[Any]] = None)[source]
clone(**kwargs) → SignalT[source]
with_default_sender(sender: Any = None) → SignalT[source]
coroutine send(sender: T_contra, *args, **kwargs) → None[source]
class mode.SyncSignalT(*, name: str = None, owner: Type = None, loop: asyncio.events.AbstractEventLoop = None, default_sender: Any = None, receivers: MutableSet[Any] = None, filter_receivers: MutableMapping[Any, MutableSet[Any]] = None)[source]
send(sender: T_contra, *args, **kwargs) → None[source]
clone(**kwargs) → SyncSignalT[source]
with_default_sender(sender: Any = None) → SyncSignalT[source]
class mode.SupervisorStrategyT(*services, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = None, replacement: Callable[[mode.types.supervisors.ServiceT, int], Awaitable[mode.types.supervisors.ServiceT]] = None, **kwargs)[source]
wakeup() → None[source]
add(*services) → None[source]
discard(*services) → None[source]
service_operational(service: mode.types.supervisors.ServiceT) → bool[source]
coroutine restart_service(service: mode.types.supervisors.ServiceT) → None[source]
mode.want_seconds(s: float) → float[source]

Convert Seconds to float.

class mode.flight_recorder(logger: Any, *, timeout: Union[datetime.timedelta, float, str], loop: asyncio.events.AbstractEventLoop = None)[source]

Flight Recorder context for use with with statement.

This is a logging utility to log stuff only when something times out.

For example if you have a background thread that is sometimes hanging:

class RedisCache(mode.Service):

    @mode.timer(1.0)
    def _background_refresh(self) -> None:
        self._users = await self.redis_client.get(USER_KEY)
        self._posts = await self.redis_client.get(POSTS_KEY)

You want to figure out on what line this is hanging, but logging all the time will provide way too much output, and will even change how fast the program runs and that can mask race conditions, so that they never happen.

Use the flight recorder to save the logs and only log when it times out:

logger = mode.get_logger(__name__)

class RedisCache(mode.Service):

    @mode.timer(1.0)
    def _background_refresh(self) -> None:
        with mode.flight_recorder(logger, timeout=10.0) as on_timeout:
            on_timeout.info(f'+redis_client.get({USER_KEY!r})')
            await self.redis_client.get(USER_KEY)
            on_timeout.info(f'-redis_client.get({USER_KEY!r})')

            on_timeout.info(f'+redis_client.get({POSTS_KEY!r})')
            await self.redis_client.get(POSTS_KEY)
            on_timeout.info(f'-redis_client.get({POSTS_KEY!r})')

If the body of this with statement completes before the timeout, the logs are forgotten about and never emitted – if it takes more than ten seconds to complete, we will see these messages in the log:

[2018-04-19 09:43:55,877: WARNING]: Warning: Task timed out!
[2018-04-19 09:43:55,878: WARNING]:
    Please make sure it is hanging before restarting.
[2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1]
    (started at Thu Apr 19 09:43:45 2018) Replaying logs...
[2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1]
    (Thu Apr 19 09:43:45 2018) +redis_client.get('user')
[2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1]
    (Thu Apr 19 09:43:49 2018) -redis_client.get('user')
[2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1]
    (Thu Apr 19 09:43:46 2018) +redis_client.get('posts')
[2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1] -End of log-

Now we know this redis_client.get call can take too long to complete, and should consider adding a timeout to it.

wrap_debug(obj: Any) → mode.utils.logging.Logwrapped[source]
wrap_info(obj: Any) → mode.utils.logging.Logwrapped[source]
wrap_warn(obj: Any) → mode.utils.logging.Logwrapped[source]
wrap_error(obj: Any) → mode.utils.logging.Logwrapped[source]
wrap(severity: int, obj: Any) → mode.utils.logging.Logwrapped[source]
activate() → None[source]
cancel() → None[source]
log(severity: int, message: str, *args, **kwargs) → None[source]
mode.get_logger(name: str) → logging.Logger[source]

Get logger by name.

mode.setup_logging(*, loglevel: Union[str, int] = None, logfile: Union[str, IO] = None, loghandlers: List[logging.StreamHandler] = None, logging_config: Dict = None) → int[source]

Setup logging to file/stream.

mode.label(s: Any) → str[source]
mode.shortlabel(s: Any) → str[source]
class mode.Worker(*services, debug: bool = False, quiet: bool = False, logging_config: Dict = None, loglevel: Union[str, int] = None, logfile: Union[str, IO] = None, redirect_stdouts: bool = True, redirect_stdouts_level: Union[int, str] = None, stdout: IO = <_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>, stderr: IO = <_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'>, console_port: int = 50101, loghandlers: List[logging.StreamHandler] = None, blocking_timeout: Union[datetime.timedelta, float, str] = 10.0, loop: asyncio.events.AbstractEventLoop = None, daemon: bool = True, **kwargs)[source]
say(msg: str) → None[source]

Write message to standard out.

carp(msg: str) → None[source]

Write warning to standard err.

on_init_dependencies() → Iterable[mode.types.services.ServiceT][source]

Callback to be used to add service dependencies.

on_setup_root_logger(logger: logging.Logger, level: int) → None[source]
install_signal_handlers() → None[source]
coroutine default_on_first_start() → None[source]
logger = <Logger mode.worker (WARNING)>
coroutine maybe_start_blockdetection() → None[source]
coroutine on_execute() → None[source]
coroutine on_first_start() → None[source]

Called only the first time the service is started.

coroutine start() → None[source]
execute_from_commandline() → None[source]
on_worker_shutdown() → None[source]
stop_and_shutdown() → None[source]
blocking_detector