mode.services
¶
Async I/O services that can be started/stopped/shutdown.
-
class
mode.services.
ServiceBase
(*, loop: asyncio.events.AbstractEventLoop = None)[source]¶ Base class for services.
-
abstract
= True¶ Set to True if this service class is abstract-only, meaning it will only be used as a base class.
-
logger
= None¶ Logger used by this service. If not explicitly set this will be based on get_logger(cls.__name__)
-
property
loop
¶
-
-
class
mode.services.
Diag
(service: mode.types.services.ServiceT)[source]¶ Service diagnostics.
This can be used to track what your service is doing. For example if your service is a Kafka consumer with a background thread that commits the offset every 30 seconds, you may want to see when this happens:
DIAG_COMMITTING = 'committing' class Consumer(Service): @Service.task async def _background_commit(self) -> None: while not self.should_stop: await self.sleep(30.0) self.diag.set_flag(DIAG_COMITTING) try: await self._consumer.commit() finally: self.diag.unset_flag(DIAG_COMMITTING)
The above code is setting the flag manually, but you can also use a decorator to accomplish the same thing:
@Service.timer(30.0) async def _background_commit(self) -> None: await self.commit() @Service.transitions_with(DIAG_COMITTING) async def commit(self) -> None: await self._consumer.commit()
-
class
mode.services.
Service
(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None)[source]¶ An asyncio service that can be started/stopped/restarted.
- Keyword Arguments
beacon (NodeT) – Beacon used to track services in a graph.
loop (asyncio.AbstractEventLoop) – Event loop object.
-
abstract
= False¶
-
class
Diag
(service: mode.types.services.ServiceT)¶ Service diagnostics.
This can be used to track what your service is doing. For example if your service is a Kafka consumer with a background thread that commits the offset every 30 seconds, you may want to see when this happens:
DIAG_COMMITTING = 'committing' class Consumer(Service): @Service.task async def _background_commit(self) -> None: while not self.should_stop: await self.sleep(30.0) self.diag.set_flag(DIAG_COMITTING) try: await self._consumer.commit() finally: self.diag.unset_flag(DIAG_COMMITTING)
The above code is setting the flag manually, but you can also use a decorator to accomplish the same thing:
@Service.timer(30.0) async def _background_commit(self) -> None: await self.commit() @Service.transitions_with(DIAG_COMITTING) async def commit(self) -> None: await self._consumer.commit()
-
set_flag
(flag: str) → None¶
-
unset_flag
(flag: str) → None¶
-
-
wait_for_shutdown
= False¶ Set to True if .stop must wait for the shutdown flag to be set.
-
shutdown_timeout
= 60.0¶ Time to wait for shutdown flag set before we give up.
-
restart_count
= 0¶ Current number of times this service instance has been restarted.
-
mundane_level
= 'info'¶ The log level for mundane info such as starting, stopping, etc. Set this to
"debug"
for less information.
-
classmethod
from_awaitable
(coro: Awaitable, *, name: str = None, **kwargs: Any) → mode.types.services.ServiceT[source]¶
-
classmethod
task
(fun: Callable[Any, Awaitable[None]]) → mode.services.ServiceTask[source]¶ Decorate function to be used as background task.
Example
>>> class S(Service): ... ... @Service.task ... async def background_task(self): ... while not self.should_stop: ... await self.sleep(1.0) ... print('Waking up')
-
classmethod
timer
(interval: Union[datetime.timedelta, float, str]) → Callable[Callable[mode.types.services.ServiceT, Awaitable[None]], mode.services.ServiceTask][source]¶ Background timer executing every
n
seconds.Example
>>> class S(Service): ... ... @Service.timer(1.0) ... async def background_timer(self): ... print('Waking up')
-
classmethod
transitions_to
(flag: str) → Callable[source]¶ Decorate function to set and reset diagnostic flag.
-
add_dependency
(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]¶ Add dependency to other service.
The service will be started/stopped with this service.
-
add_future
(coro: Awaitable) → _asyncio.Future[source]¶ Add relationship to asyncio.Future.
The future will be joined when this service is stopped.
-
on_init_dependencies
() → Iterable[mode.types.services.ServiceT][source]¶ Return list of service dependencies for this service.
-
set_shutdown
() → None[source]¶ Set the shutdown signal.
Notes
If
wait_for_shutdown
is set, stopping the service will wait for this flag to be set.
-
itertimer
(interval: Union[datetime.timedelta, float, str], *, max_drift_correction: float = 0.1, loop: asyncio.events.AbstractEventLoop = None, sleep: Callable[..., Awaitable] = None, clock: Callable[float] = <built-in function perf_counter>, name: str = '') → AsyncIterator[float][source]¶ Sleep
interval
seconds for every iteration.This is an async iterator that takes advantage of
timer_intervals()
to act as a timer that stop drift from occurring, and adds a tiny amount of drift to timers so that they don’t start at the same time.Uses
Service.sleep
which will bail-out-quick if the service is stopped.Note
Will sleep the full interval seconds before returning from first iteration.
Examples
>>> async for sleep_time in self.itertimer(1.0): ... print('another second passed, just woke up...') ... await perform_some_http_request()
-
property
started
¶ Return
True
if the service was started.
-
property
crashed
¶
-
property
should_stop
¶ Return
True
if the service must stop.
-
property
state
¶ Service state - as a human readable string.
-
property
label
¶ Label used for graphs.
-
property
shortlabel
¶ Label used for logging.
-
property
beacon
¶ Beacon used to track services in a dependency graph.
-
logger
= <Logger mode.services (WARNING)>¶
-
mode.services.
task
(fun: Callable[Any, Awaitable[None]]) → mode.services.ServiceTask¶ Decorate function to be used as background task.
Example
>>> class S(Service): ... ... @Service.task ... async def background_task(self): ... while not self.should_stop: ... await self.sleep(1.0) ... print('Waking up')
-
mode.services.
timer
(interval: Union[datetime.timedelta, float, str]) → Callable[Callable[mode.types.services.ServiceT, Awaitable[None]], mode.services.ServiceTask]¶ Background timer executing every
n
seconds.Example
>>> class S(Service): ... ... @Service.timer(1.0) ... async def background_timer(self): ... print('Waking up')