The Service class manages the services and background tasks started by the async program, so that we can implement graceful shutdown and also helps us visualize the relationships between services in a dependency graph.

Anything that can be started/stopped and restarted should probably be a subclass of the Service class.

The Service API

A service can be started, and it may start other services and background tasks. Most actions in a service are asynchronous, so needs to be executed from within an async function.

This first section defines the public service API, as if used by the user, the next section will define the methods service authors write to define new services.


class mode.Service
async start() → None
async maybe_start() → bool

Start the service, if it has not already been started.

async stop() → None

Stop the service.

async restart() → None

Restart this service.

async wait_until_stopped() → None

Wait until the service is signalled to stop.

set_shutdown() → None

Set the shutdown signal.


If wait_for_shutdown is set, stopping the service will wait for this flag to be set.


class mode.Service

Return True if the service was started.


Label used for graphs.


Label used for logging.


Beacon used to track services in a dependency graph.

Defining new services

Adding child services

Child services can be added in three ways,

  1. Using add_dependency() in __post_init__:

    class MyService(Service):
        def __post_init__(self) -> None:
  2. Using add_dependency() in on_start:

    class MyService(Service):
        async def on_start(self) -> None:
  3. Using on_init_dependencies()

    This is is a method that if customized should return an iterable of service instances:

    from typing import Iterable
    from mode import Service, ServiceT
    class MyService(Service):
        def on_init_dependencies(self) -> Iterable[ServiceT]:
            return [ServiceA(), ServiceB()]


Knowing exactly what is called, when it’s called and in what order is important, and this table will help you understand that:

Order at start (await Service.start())

  1. The on_first_start callback is called.

  2. Service logs: "[Service] Starting...".

  3. on_start callback is called.

  4. All @Service.task background tasks are started (in definition order).

  5. All child services added by add_dependency(), or on_init_dependencies()) are started.

  6. Service logs: "[Service] Started".

  7. The on_started callback is called.

Order when stopping (await Service.stop())

  1. Service logs; "[Service] Stopping...".

  2. The on_stop() callback is called.

  3. All child services are stopped, in reverse order.

  4. All asyncio futures added by add_future() are cancelled in reverse order.

  5. Service logs: "[Service] Stopped".

  6. If Service.wait_for_shutdown = True, it will wait for the Service.set_shutdown() signal to be called.

  7. All futures started by add_future() will be gathered (awaited).

  8. The on_shutdown() callback is called.

  9. The service logs: "[Service] Shutdown complete!".

Order when restarting (await Service.restart())

  1. The service is stopped (await service.stop()).

  2. The __post_init__() callback is called again.

  3. The service is started (await service.start()).


class mode.Service
async on_start() → None

Service is starting.

async on_first_start() → None

Service started for the first time in this process.

async on_started() → None

Service has started.

async on_stop() → None

Service is being stopped/restarted.

async on_shutdown() → None

Service is being stopped/restarted.

async on_restart() → None

Service is being restarted.

Handling Errors

class mode.Service
async crash(reason: BaseException) → None

Crash the service and all child services.


class mode.Service
async sleep(n: Union[datetime.timedelta, float, str], *, loop: = None) → None

Sleep for n seconds, or until service stopped.

async wait(*coros: Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event], timeout: Union[datetime.timedelta, float, str] = None) →

Wait for coroutines to complete, or until the service stops.


Your service may add logging to notify the user what is going on, and the Service class includes some shortcuts to include the service name etc. in logs.

The self.log delegate contains shortcuts for logging:

# examples/

from mode import Service

class MyService(Service):

    async def on_start(self) -> None:
        self.log.debug('This is a debug message')'This is a info message')
        self.log.warn('This is a warning message')
        self.log.error('This is a error message')
        self.log.exception('This is a error message with traceback')
        self.log.critical('This is a critical message')

        self.log.debug('I can also include templates: %r %d %s',
                       [1, 2, 3], 303, 'string')

The logs will be emitted by a logger with the same name as the module the Service class is defined in. It’s similar to this setup, that you can do if you want to manually define the logger used by the service:

# examples/

from mode import Service, get_logger

logger = get_logger(__name__)

class MyService(Service):
    logger = logger