mode.services
¶
Async I/O services that can be started/stopped/shutdown.
-
class
mode.services.
ServiceBase
(*, loop: asyncio.events.AbstractEventLoop = None)¶ Base class for services.
-
abstract
: ClassVar[bool] = True¶ Set to True if this service class is abstract-only, meaning it will only be used as a base class.
-
logger
: logging.Logger = None¶
-
log
: CompositeLogger = None¶
-
property
loop
¶
-
-
class
mode.services.
Diag
(service: mode.types.services.ServiceT)¶ Service diagnostics.
This can be used to track what your service is doing. For example if your service is a Kafka consumer with a background thread that commits the offset every 30 seconds, you may want to see when this happens:
DIAG_COMMITTING = 'committing' class Consumer(Service): @Service.task async def _background_commit(self) -> None: while not self.should_stop: await self.sleep(30.0) self.diag.set_flag(DIAG_COMITTING) try: await self._consumer.commit() finally: self.diag.unset_flag(DIAG_COMMITTING)
The above code is setting the flag manually, but you can also use a decorator to accomplish the same thing:
@Service.timer(30.0) async def _background_commit(self) -> None: await self.commit() @Service.transitions_with(DIAG_COMITTING) async def commit(self) -> None: await self._consumer.commit()
-
flags
= None¶
-
last_transition
= None¶
-
set_flag
(flag: str) → None¶
-
unset_flag
(flag: str) → None¶
-
-
class
mode.services.
Service
(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None)¶ An asyncio service that can be started/stopped/restarted.
- Keyword Arguments
beacon (NodeT) – Beacon used to track services in a graph.
loop (asyncio.AbstractEventLoop) – Event loop object.
-
abstract
: ClassVar[bool] = False¶
-
class
Diag
(service: mode.types.services.ServiceT)¶ Service diagnostics.
This can be used to track what your service is doing. For example if your service is a Kafka consumer with a background thread that commits the offset every 30 seconds, you may want to see when this happens:
DIAG_COMMITTING = 'committing' class Consumer(Service): @Service.task async def _background_commit(self) -> None: while not self.should_stop: await self.sleep(30.0) self.diag.set_flag(DIAG_COMITTING) try: await self._consumer.commit() finally: self.diag.unset_flag(DIAG_COMMITTING)
The above code is setting the flag manually, but you can also use a decorator to accomplish the same thing:
@Service.timer(30.0) async def _background_commit(self) -> None: await self.commit() @Service.transitions_with(DIAG_COMITTING) async def commit(self) -> None: await self._consumer.commit()
-
flags
= None¶
-
last_transition
= None¶
-
set_flag
(flag: str) → None¶
-
unset_flag
(flag: str) → None¶
-
-
wait_for_shutdown
= False¶ Set to True if .stop must wait for the shutdown flag to be set.
-
shutdown_timeout
= 60.0¶ Time to wait for shutdown flag set before we give up.
-
restart_count
= 0¶ Current number of times this service instance has been restarted.
-
mundane_level
= 'info'¶ The log level for mundane info such as starting, stopping, etc. Set this to
"debug"
for less information.
-
classmethod
from_awaitable
(coro: Awaitable, *, name: str = None, **kwargs: Any) → mode.types.services.ServiceT¶
-
classmethod
task
(fun: Callable[Any, Awaitable[None]]) → mode.services.ServiceTask¶ Decorate function to be used as background task.
Example
>>> class S(Service): ... ... @Service.task ... async def background_task(self): ... while not self.should_stop: ... await self.sleep(1.0) ... print('Waking up')
-
classmethod
timer
(interval: Union[datetime.timedelta, float, str]) → Callable[Callable, mode.services.ServiceTask]¶ Background timer executing every
n
seconds.Example
>>> class S(Service): ... ... @Service.timer(1.0) ... async def background_timer(self): ... print('Waking up')
-
classmethod
transitions_to
(flag: str) → Callable¶ Decorate function to set and reset diagnostic flag.
-
async
transition_with
(flag: str, fut: Awaitable, *args: Any, **kwargs: Any) → Any¶
-
add_dependency
(service: mode.types.services.ServiceT) → mode.types.services.ServiceT¶ Add dependency to other service.
The service will be started/stopped with this service.
-
async
add_runtime_dependency
(service: mode.types.services.ServiceT) → mode.types.services.ServiceT¶
-
async
remove_dependency
(service: mode.types.services.ServiceT) → mode.types.services.ServiceT¶ Stop and remove dependency of this service.
-
async
add_async_context
(context: AsyncContextManager) → Any¶
-
add_context
(context: ContextManager) → Any¶
-
add_future
(coro: Awaitable) → _asyncio.Future¶ Add relationship to asyncio.Future.
The future will be joined when this service is stopped.
-
on_init
() → None¶
-
on_init_dependencies
() → Iterable[mode.types.services.ServiceT]¶ Return list of service dependencies for this service.
-
async
join_services
(services: Sequence[mode.types.services.ServiceT]) → None¶
-
async
sleep
(n: Union[datetime.timedelta, float, str], *, loop: asyncio.events.AbstractEventLoop = None) → None¶ Sleep for
n
seconds, or until service stopped.
-
async
wait_for_stopped
(*coros: Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event], timeout: Union[datetime.timedelta, float, str] = None) → bool¶
-
async
wait
(*coros: Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event], timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResult¶ Wait for coroutines to complete, or until the service stops.
-
async
wait_many
(coros: Iterable[Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event]], *, timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResult¶
-
async
wait_first
(*coros: Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event], timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResults¶
-
async
start
() → None¶
-
async
maybe_start
() → bool¶ Start the service, if it has not already been started.
-
async
crash
(reason: BaseException) → None¶ Crash the service and all child services.
-
async
stop
() → None¶ Stop the service.
-
async
restart
() → None¶ Restart this service.
-
service_reset
() → None¶
-
async
wait_until_stopped
() → None¶ Wait until the service is signalled to stop.
-
set_shutdown
() → None¶ Set the shutdown signal.
Notes
If
wait_for_shutdown
is set, stopping the service will wait for this flag to be set.
-
itertimer
(interval: Union[datetime.timedelta, float, str], *, max_drift_correction: float = 0.1, loop: asyncio.events.AbstractEventLoop = None, sleep: Callable[..., Awaitable] = None, clock: Callable[float] = <built-in function perf_counter>, name: str = '') → AsyncIterator[float]¶ Sleep
interval
seconds for every iteration.This is an async iterator that takes advantage of
Timer()
to monitor drift and timer oerlap.Uses
Service.sleep
so exits fast when the service is stopped.Note
Will sleep the full interval seconds before returning from first iteration.
Examples
>>> async for sleep_time in self.itertimer(1.0): ... print('another second passed, just woke up...') ... await perform_some_http_request()
-
property
started
¶ Return
True
if the service was started.
-
property
crashed
¶
-
property
should_stop
¶ Return
True
if the service must stop.
-
property
state
¶ Service state - as a human readable string.
-
property
label
¶ Label used for graphs.
-
property
shortlabel
¶ Label used for logging.
-
property
beacon
¶ Beacon used to track services in a dependency graph.
-
logger
= <Logger mode.services (WARNING)>¶
-
property
crash_reason
¶
-
mode.services.
task
(fun: Callable[Any, Awaitable[None]]) → mode.services.ServiceTask¶ Decorate function to be used as background task.
Example
>>> class S(Service): ... ... @Service.task ... async def background_task(self): ... while not self.should_stop: ... await self.sleep(1.0) ... print('Waking up')
-
mode.services.
timer
(interval: Union[datetime.timedelta, float, str]) → Callable[Callable, mode.services.ServiceTask]¶ Background timer executing every
n
seconds.Example
>>> class S(Service): ... ... @Service.timer(1.0) ... async def background_timer(self): ... print('Waking up')