mode
¶
AsyncIO Service-based programming.
-
class
mode.
Service
(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None)[source]¶ An asyncio service that can be started/stopped/restarted.
- Keyword Arguments
beacon (NodeT) – Beacon used to track services in a graph.
loop (asyncio.AbstractEventLoop) – Event loop object.
-
abstract
= False¶
-
class
Diag
(service: mode.types.services.ServiceT)¶ Service diagnostics.
This can be used to track what your service is doing. For example if your service is a Kafka consumer with a background thread that commits the offset every 30 seconds, you may want to see when this happens:
DIAG_COMMITTING = 'committing' class Consumer(Service): @Service.task async def _background_commit(self) -> None: while not self.should_stop: await self.sleep(30.0) self.diag.set_flag(DIAG_COMITTING) try: await self._consumer.commit() finally: self.diag.unset_flag(DIAG_COMMITTING)
The above code is setting the flag manually, but you can also use a decorator to accomplish the same thing:
@Service.timer(30.0) async def _background_commit(self) -> None: await self.commit() @Service.transitions_with(DIAG_COMITTING) async def commit(self) -> None: await self._consumer.commit()
-
set_flag
(flag: str) → None¶
-
unset_flag
(flag: str) → None¶
-
-
wait_for_shutdown
= False¶ Set to True if .stop must wait for the shutdown flag to be set.
-
shutdown_timeout
= 60.0¶ Time to wait for shutdown flag set before we give up.
-
restart_count
= 0¶ Current number of times this service instance has been restarted.
-
mundane_level
= 'info'¶ The log level for mundane info such as starting, stopping, etc. Set this to
"debug"
for less information.
-
classmethod
from_awaitable
(coro: Awaitable, *, name: str = None, **kwargs: Any) → mode.types.services.ServiceT[source]¶
-
classmethod
task
(fun: Callable[Any, Awaitable[None]]) → mode.services.ServiceTask[source]¶ Decorate function to be used as background task.
Example
>>> class S(Service): ... ... @Service.task ... async def background_task(self): ... while not self.should_stop: ... await self.sleep(1.0) ... print('Waking up')
-
classmethod
timer
(interval: Union[datetime.timedelta, float, str]) → Callable[Callable[mode.types.services.ServiceT, Awaitable[None]], mode.services.ServiceTask][source]¶ Background timer executing every
n
seconds.Example
>>> class S(Service): ... ... @Service.timer(1.0) ... async def background_timer(self): ... print('Waking up')
-
classmethod
transitions_to
(flag: str) → Callable[source]¶ Decorate function to set and reset diagnostic flag.
-
add_dependency
(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]¶ Add dependency to other service.
The service will be started/stopped with this service.
-
add_future
(coro: Awaitable) → _asyncio.Future[source]¶ Add relationship to asyncio.Future.
The future will be joined when this service is stopped.
-
on_init_dependencies
() → Iterable[mode.types.services.ServiceT][source]¶ Return list of service dependencies for this service.
-
set_shutdown
() → None[source]¶ Set the shutdown signal.
Notes
If
wait_for_shutdown
is set, stopping the service will wait for this flag to be set.
-
property
started
¶ Return
True
if the service was started.
-
property
crashed
¶
-
property
should_stop
¶ Return
True
if the service must stop.
-
property
state
¶ Service state - as a human readable string.
-
property
label
¶ Label used for graphs.
-
property
shortlabel
¶ Label used for logging.
-
property
beacon
¶ Beacon used to track services in a dependency graph.
-
logger
= <Logger mode.services (WARNING)>¶
-
mode.
task
(fun: Callable[Any, Awaitable[None]]) → mode.services.ServiceTask¶ Decorate function to be used as background task.
Example
>>> class S(Service): ... ... @Service.task ... async def background_task(self): ... while not self.should_stop: ... await self.sleep(1.0) ... print('Waking up')
-
mode.
timer
(interval: Union[datetime.timedelta, float, str]) → Callable[Callable[mode.types.services.ServiceT, Awaitable[None]], mode.services.ServiceTask]¶ Background timer executing every
n
seconds.Example
>>> class S(Service): ... ... @Service.timer(1.0) ... async def background_timer(self): ... print('Waking up')
-
class
mode.
BaseSignal
(*, name: str = None, owner: Type = None, loop: asyncio.events.AbstractEventLoop = None, default_sender: Any = None, receivers: MutableSet[Any] = None, filter_receivers: MutableMapping[Any, MutableSet[Any]] = None)[source]¶ Base class for signal/observer pattern.
-
connect
(fun: Union[Callable[[T, Any, BaseSignalT, Any], None], Callable[[T, Any, BaseSignalT, Any], Awaitable[None]]] = None, **kwargs: Any) → Callable[source]¶
-
disconnect
(fun: Union[Callable[[T, Any, BaseSignalT, Any], None], Callable[[T, Any, BaseSignalT, Any], Awaitable[None]]], *, weak: bool = False, sender: Any = None) → None[source]¶
-
iter_receivers
(sender: T_contra) → Iterable[Union[Callable[[T, Any, mode.types.signals.BaseSignalT, Any], None], Callable[[T, Any, mode.types.signals.BaseSignalT, Any], Awaitable[None]]]][source]¶
-
property
ident
¶
-
property
label
¶
-
-
class
mode.
Signal
(*, name: str = None, owner: Type = None, loop: asyncio.events.AbstractEventLoop = None, default_sender: Any = None, receivers: MutableSet[Any] = None, filter_receivers: MutableMapping[Any, MutableSet[Any]] = None)[source]¶ Asynchronous signal (using
async def
functions).
-
class
mode.
SyncSignal
(*, name: str = None, owner: Type = None, loop: asyncio.events.AbstractEventLoop = None, default_sender: Any = None, receivers: MutableSet[Any] = None, filter_receivers: MutableMapping[Any, MutableSet[Any]] = None)[source]¶ Signal that is synchronous (using regular
def
functions).
-
class
mode.
ForfeitOneForAllSupervisor
(*services: mode.types.services.ServiceT, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = <class 'mode.exceptions.MaxRestartsExceeded'>, replacement: Callable[[mode.types.services.ServiceT, int], Awaitable[mode.types.services.ServiceT]] = None, **kwargs: Any)[source]¶ If one service in the group crashes, we give up on all of them.
-
logger
= <Logger mode.supervisors (WARNING)>¶
-
-
class
mode.
ForfeitOneForOneSupervisor
(*services: mode.types.services.ServiceT, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = <class 'mode.exceptions.MaxRestartsExceeded'>, replacement: Callable[[mode.types.services.ServiceT, int], Awaitable[mode.types.services.ServiceT]] = None, **kwargs: Any)[source]¶ Supervisor that if a service crashes, we do not restart it.
-
logger
= <Logger mode.supervisors (WARNING)>¶
-
-
class
mode.
OneForAllSupervisor
(*services: mode.types.services.ServiceT, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = <class 'mode.exceptions.MaxRestartsExceeded'>, replacement: Callable[[mode.types.services.ServiceT, int], Awaitable[mode.types.services.ServiceT]] = None, **kwargs: Any)[source]¶ Supervisor that restarts all services when a service crashes.
-
logger
= <Logger mode.supervisors (WARNING)>¶
-
-
class
mode.
OneForOneSupervisor
(*services: mode.types.services.ServiceT, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = <class 'mode.exceptions.MaxRestartsExceeded'>, replacement: Callable[[mode.types.services.ServiceT, int], Awaitable[mode.types.services.ServiceT]] = None, **kwargs: Any)[source]¶ Supervisor simply restarts any crashed service.
-
logger
= <Logger mode.supervisors (WARNING)>¶
-
-
class
mode.
SupervisorStrategy
(*services: mode.types.services.ServiceT, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = <class 'mode.exceptions.MaxRestartsExceeded'>, replacement: Callable[[mode.types.services.ServiceT, int], Awaitable[mode.types.services.ServiceT]] = None, **kwargs: Any)[source]¶ Base class for all supervisor strategies.
-
logger
= <Logger mode.supervisors (WARNING)>¶
-
-
class
mode.
CrashingSupervisor
(*services: mode.types.services.ServiceT, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = <class 'mode.exceptions.MaxRestartsExceeded'>, replacement: Callable[[mode.types.services.ServiceT, int], Awaitable[mode.types.services.ServiceT]] = None, **kwargs: Any)[source]¶ Supervisor that crashes the whole program.
-
logger
= <Logger mode.supervisors (WARNING)>¶
-
-
class
mode.
ServiceT
(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None)[source]¶ Abstract type for an asynchronous service that can be started/stopped.
See also
-
wait_for_shutdown
= False¶
-
restart_count
= 0¶
-
supervisor
= None¶
-
abstract
add_dependency
(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]¶
-
abstract property
started
¶
-
abstract property
crashed
¶
-
abstract property
should_stop
¶
-
abstract property
state
¶
-
abstract property
label
¶
-
abstract property
shortlabel
¶
-
property
beacon
¶
-
abstract property
loop
¶
-
-
class
mode.
BaseSignalT
(*, name: str = None, owner: Type = None, loop: asyncio.events.AbstractEventLoop = None, default_sender: Any = None, receivers: MutableSet[Any] = None, filter_receivers: MutableMapping[Any, MutableSet[Any]] = None)[source]¶ Base type for all signals.
-
class
mode.
SignalT
(*, name: str = None, owner: Type = None, loop: asyncio.events.AbstractEventLoop = None, default_sender: Any = None, receivers: MutableSet[Any] = None, filter_receivers: MutableMapping[Any, MutableSet[Any]] = None)[source]¶ Base class for all async signals (using
async def
).
-
class
mode.
SyncSignalT
(*, name: str = None, owner: Type = None, loop: asyncio.events.AbstractEventLoop = None, default_sender: Any = None, receivers: MutableSet[Any] = None, filter_receivers: MutableMapping[Any, MutableSet[Any]] = None)[source]¶ Base class for all synchronous signals (using regular
def
).
-
class
mode.
SupervisorStrategyT
(*services: mode.types.supervisors.ServiceT, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = None, replacement: Callable[[mode.types.supervisors.ServiceT, int], Awaitable[mode.types.supervisors.ServiceT]] = None, **kwargs: Any)[source]¶ Base type for all supervisor strategies.
-
class
mode.
flight_recorder
(logger: Any, *, timeout: Union[datetime.timedelta, float, str], loop: asyncio.events.AbstractEventLoop = None)[source]¶ Flight Recorder context for use with
with
statement.This is a logging utility to log stuff only when something times out.
For example if you have a background thread that is sometimes hanging:
class RedisCache(mode.Service): @mode.timer(1.0) def _background_refresh(self) -> None: self._users = await self.redis_client.get(USER_KEY) self._posts = await self.redis_client.get(POSTS_KEY)
You want to figure out on what line this is hanging, but logging all the time will provide way too much output, and will even change how fast the program runs and that can mask race conditions, so that they never happen.
Use the flight recorder to save the logs and only log when it times out:
logger = mode.get_logger(__name__) class RedisCache(mode.Service): @mode.timer(1.0) def _background_refresh(self) -> None: with mode.flight_recorder(logger, timeout=10.0) as on_timeout: on_timeout.info(f'+redis_client.get({USER_KEY!r})') await self.redis_client.get(USER_KEY) on_timeout.info(f'-redis_client.get({USER_KEY!r})') on_timeout.info(f'+redis_client.get({POSTS_KEY!r})') await self.redis_client.get(POSTS_KEY) on_timeout.info(f'-redis_client.get({POSTS_KEY!r})')
If the body of this
with
statement completes before the timeout, the logs are forgotten about and never emitted – if it takes more than ten seconds to complete, we will see these messages in the log:[2018-04-19 09:43:55,877: WARNING]: Warning: Task timed out! [2018-04-19 09:43:55,878: WARNING]: Please make sure it is hanging before restarting. [2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1] (started at Thu Apr 19 09:43:45 2018) Replaying logs... [2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1] (Thu Apr 19 09:43:45 2018) +redis_client.get('user') [2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1] (Thu Apr 19 09:43:49 2018) -redis_client.get('user') [2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1] (Thu Apr 19 09:43:46 2018) +redis_client.get('posts') [2018-04-19 09:43:55,878: INFO]: [Flight Recorder-1] -End of log-
Now we know this
redis_client.get
call can take too long to complete, and should consider adding a timeout to it.
-
mode.
setup_logging
(*, loglevel: Union[str, int] = None, logfile: Union[str, IO] = None, loghandlers: List[logging.StreamHandler] = None, logging_config: Dict = None) → int[source]¶ Configure logging subsystem.
-
class
mode.
Worker
(*services: mode.types.services.ServiceT, debug: bool = False, quiet: bool = False, logging_config: Dict = None, loglevel: Union[str, int] = None, logfile: Union[str, IO] = None, redirect_stdouts: bool = True, redirect_stdouts_level: Union[int, str] = None, stdout: IO = <_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>, stderr: IO = <_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'>, console_port: int = 50101, loghandlers: List[logging.StreamHandler] = None, blocking_timeout: Union[datetime.timedelta, float, str] = 10.0, loop: asyncio.events.AbstractEventLoop = None, daemon: bool = True, **kwargs: Any)[source]¶ Start mode service from the command-line.
-
on_init_dependencies
() → Iterable[mode.types.services.ServiceT][source]¶ Return list of service dependencies for this service.
-
logger
= <Logger mode.worker (WARNING)>¶
-
property
blocking_detector
¶
-