mode.services

Async I/O services that can be started/stopped/shutdown.

class mode.services.ServiceBase(*, loop: asyncio.events.AbstractEventLoop = None)

Base class for services.

abstract: ClassVar[bool] = True

Set to True if this service class is abstract-only, meaning it will only be used as a base class.

logger: logging.Logger = None
log: CompositeLogger = None
property loop
class mode.services.Diag(service: mode.types.services.ServiceT)

Service diagnostics.

This can be used to track what your service is doing. For example if your service is a Kafka consumer with a background thread that commits the offset every 30 seconds, you may want to see when this happens:

DIAG_COMMITTING = 'committing'

class Consumer(Service):

    @Service.task
    async def _background_commit(self) -> None:
        while not self.should_stop:
            await self.sleep(30.0)
            self.diag.set_flag(DIAG_COMITTING)
            try:
                await self._consumer.commit()
            finally:
                self.diag.unset_flag(DIAG_COMMITTING)

The above code is setting the flag manually, but you can also use a decorator to accomplish the same thing:

@Service.timer(30.0)
async def _background_commit(self) -> None:
    await self.commit()

@Service.transitions_with(DIAG_COMITTING)
async def commit(self) -> None:
    await self._consumer.commit()
flags = None
last_transition = None
set_flag(flag: str) → None
unset_flag(flag: str) → None
class mode.services.Service(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None)

An asyncio service that can be started/stopped/restarted.

Keyword Arguments
abstract: ClassVar[bool] = False
class Diag(service: mode.types.services.ServiceT)

Service diagnostics.

This can be used to track what your service is doing. For example if your service is a Kafka consumer with a background thread that commits the offset every 30 seconds, you may want to see when this happens:

DIAG_COMMITTING = 'committing'

class Consumer(Service):

    @Service.task
    async def _background_commit(self) -> None:
        while not self.should_stop:
            await self.sleep(30.0)
            self.diag.set_flag(DIAG_COMITTING)
            try:
                await self._consumer.commit()
            finally:
                self.diag.unset_flag(DIAG_COMMITTING)

The above code is setting the flag manually, but you can also use a decorator to accomplish the same thing:

@Service.timer(30.0)
async def _background_commit(self) -> None:
    await self.commit()

@Service.transitions_with(DIAG_COMITTING)
async def commit(self) -> None:
    await self._consumer.commit()
flags = None
last_transition = None
set_flag(flag: str) → None
unset_flag(flag: str) → None
wait_for_shutdown = False

Set to True if .stop must wait for the shutdown flag to be set.

shutdown_timeout = 60.0

Time to wait for shutdown flag set before we give up.

restart_count = 0

Current number of times this service instance has been restarted.

mundane_level = 'info'

The log level for mundane info such as starting, stopping, etc. Set this to "debug" for less information.

classmethod from_awaitable(coro: Awaitable, *, name: str = None, **kwargs: Any) → mode.types.services.ServiceT
classmethod task(fun: Callable[Any, Awaitable[None]]) → mode.services.ServiceTask

Decorate function to be used as background task.

Example

>>> class S(Service):
...
...     @Service.task
...     async def background_task(self):
...         while not self.should_stop:
...             await self.sleep(1.0)
...             print('Waking up')
classmethod timer(interval: Union[datetime.timedelta, float, str]) → Callable[Callable, mode.services.ServiceTask]

Background timer executing every n seconds.

Example

>>> class S(Service):
...
...     @Service.timer(1.0)
...     async def background_timer(self):
...         print('Waking up')
classmethod transitions_to(flag: str) → Callable

Decorate function to set and reset diagnostic flag.

async transition_with(flag: str, fut: Awaitable, *args: Any, **kwargs: Any) → Any
add_dependency(service: mode.types.services.ServiceT) → mode.types.services.ServiceT

Add dependency to other service.

The service will be started/stopped with this service.

async add_runtime_dependency(service: mode.types.services.ServiceT) → mode.types.services.ServiceT
async remove_dependency(service: mode.types.services.ServiceT) → mode.types.services.ServiceT

Stop and remove dependency of this service.

async add_async_context(context: AsyncContextManager) → Any
add_context(context: ContextManager) → Any
add_future(coro: Awaitable) → _asyncio.Future

Add relationship to asyncio.Future.

The future will be joined when this service is stopped.

on_init() → None
on_init_dependencies() → Iterable[mode.types.services.ServiceT]

Return list of service dependencies for this service.

async join_services(services: Sequence[mode.types.services.ServiceT]) → None
async sleep(n: Union[datetime.timedelta, float, str], *, loop: asyncio.events.AbstractEventLoop = None) → None

Sleep for n seconds, or until service stopped.

async wait_for_stopped(*coros: Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event], timeout: Union[datetime.timedelta, float, str] = None) → bool
async wait(*coros: Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event], timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResult

Wait for coroutines to complete, or until the service stops.

async wait_many(coros: Iterable[Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event]], *, timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResult
async wait_first(*coros: Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event], timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResults
async start() → None
async maybe_start() → bool

Start the service, if it has not already been started.

async crash(reason: BaseException) → None

Crash the service and all child services.

async stop() → None

Stop the service.

async restart() → None

Restart this service.

service_reset() → None
async wait_until_stopped() → None

Wait until the service is signalled to stop.

set_shutdown() → None

Set the shutdown signal.

Notes

If wait_for_shutdown is set, stopping the service will wait for this flag to be set.

itertimer(interval: Union[datetime.timedelta, float, str], *, max_drift_correction: float = 0.1, loop: asyncio.events.AbstractEventLoop = None, sleep: Callable[..., Awaitable] = None, clock: Callable[float] = <built-in function perf_counter>, name: str = '') → AsyncIterator[float]

Sleep interval seconds for every iteration.

This is an async iterator that takes advantage of Timer() to monitor drift and timer oerlap.

Uses Service.sleep so exits fast when the service is stopped.

Note

Will sleep the full interval seconds before returning from first iteration.

Examples

>>> async for sleep_time in self.itertimer(1.0):
...   print('another second passed, just woke up...')
...   await perform_some_http_request()
property started

Return True if the service was started.

property crashed
property should_stop

Return True if the service must stop.

property state

Service state - as a human readable string.

property label

Label used for graphs.

property shortlabel

Label used for logging.

property beacon

Beacon used to track services in a dependency graph.

logger = <Logger mode.services (WARNING)>
property crash_reason
mode.services.task(fun: Callable[Any, Awaitable[None]]) → mode.services.ServiceTask

Decorate function to be used as background task.

Example

>>> class S(Service):
...
...     @Service.task
...     async def background_task(self):
...         while not self.should_stop:
...             await self.sleep(1.0)
...             print('Waking up')
mode.services.timer(interval: Union[datetime.timedelta, float, str]) → Callable[Callable, mode.services.ServiceTask]

Background timer executing every n seconds.

Example

>>> class S(Service):
...
...     @Service.timer(1.0)
...     async def background_timer(self):
...         print('Waking up')