mode.types

class mode.types.DiagT(service: mode.types.services.ServiceT)

Diag keeps track of a services diagnostic flags.

flags: Set[str] = None
last_transition: MutableMapping[str, float] = None
abstract set_flag(flag: str) → None
abstract unset_flag(flag: str) → None
class mode.types.ServiceT(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None)

Abstract type for an asynchronous service that can be started/stopped.

See also

mode.Service.

Diag: Type[DiagT] = None
diag: DiagT = None
async_exit_stack: AsyncExitStack = None
exit_stack: ExitStack = None
shutdown_timeout: float = None
wait_for_shutdown = False
restart_count: int = 0
supervisor: Optional[mode.types.supervisors.SupervisorStrategyT] = None
abstract add_dependency(service: mode.types.services.ServiceT) → mode.types.services.ServiceT
abstract async add_runtime_dependency(service: mode.types.services.ServiceT) → mode.types.services.ServiceT
abstract async add_async_context(context: AsyncContextManager) → Any
abstract add_context(context: ContextManager) → Any
abstract async start() → None
abstract async maybe_start() → bool
abstract async crash(reason: BaseException) → None
abstract async stop() → None
abstract service_reset() → None
abstract async restart() → None
abstract async wait_until_stopped() → None
abstract set_shutdown() → None
abstract property started
abstract property crashed
abstract property should_stop
abstract property state
abstract property label
abstract property shortlabel
property beacon
abstract property loop
abstract property crash_reason
class mode.types.BaseSignalT(*, name: str = None, owner: Type = None, loop: asyncio.events.AbstractEventLoop = None, default_sender: Any = None, receivers: MutableSet[Any] = None, filter_receivers: MutableMapping[Any, MutableSet[Any]] = None)

Base type for all signals.

name: str = None
owner: Optional[Type] = None
abstract clone(**kwargs: Any) → mode.types.signals.BaseSignalT
abstract with_default_sender(sender: Any = None) → mode.types.signals.BaseSignalT
abstract connect(fun: Union[Callable[[T, Any, BaseSignalT, Any], None], Callable[[T, Any, BaseSignalT, Any], Awaitable[None]]], **kwargs: Any) → Callable
abstract disconnect(fun: Union[Callable[[T, Any, BaseSignalT, Any], None], Callable[[T, Any, BaseSignalT, Any], Awaitable[None]]], *, sender: Any = None, weak: bool = True) → None
class mode.types.SignalT(*, name: str = None, owner: Type = None, loop: asyncio.events.AbstractEventLoop = None, default_sender: Any = None, receivers: MutableSet[Any] = None, filter_receivers: MutableMapping[Any, MutableSet[Any]] = None)

Base class for all async signals (using async def).

abstract async send(sender: T_contra, *args: Any, **kwargs: Any) → None
abstract clone(**kwargs: Any) → SignalT
abstract with_default_sender(sender: Any = None) → SignalT
name = None
owner = None
class mode.types.SyncSignalT(*, name: str = None, owner: Type = None, loop: asyncio.events.AbstractEventLoop = None, default_sender: Any = None, receivers: MutableSet[Any] = None, filter_receivers: MutableMapping[Any, MutableSet[Any]] = None)

Base class for all synchronous signals (using regular def).

abstract send(sender: T_contra, *args: Any, **kwargs: Any) → None
abstract clone(**kwargs: Any) → SyncSignalT
name = None
owner = None
abstract with_default_sender(sender: Any = None) → SyncSignalT
class mode.types.SupervisorStrategyT(*services: mode.types.supervisors.ServiceT, max_restarts: Union[datetime.timedelta, float, str] = 100.0, over: Union[datetime.timedelta, float, str] = 1.0, raises: Type[BaseException] = None, replacement: Callable[[mode.types.supervisors.ServiceT, int], Awaitable[mode.types.supervisors.ServiceT]] = None, **kwargs: Any)

Base type for all supervisor strategies.

max_restarts: float = None
over: float = None
raises: Type[BaseException] = None
abstract wakeup() → None
abstract add(*services: mode.types.supervisors.ServiceT) → None
abstract discard(*services: mode.types.supervisors.ServiceT) → None
abstract service_operational(service: mode.types.supervisors.ServiceT) → bool
abstract async restart_service(service: mode.types.supervisors.ServiceT) → None