Source code for mode.threads

"""ServiceThread - Service that starts in a separate thread.

Will use the default thread pool executor (``loop.set_default_executor()``),
unless you specify a specific executor instance.

Note: To stop something using the thread's loop, you have to
use the ``on_thread_stop`` callback instead of the on_stop callback.
"""
import asyncio
import sys
import threading
import traceback
from typing import (
    Any,
    Awaitable,
    Callable,
    Dict,
    List,
    NamedTuple,
    Optional,
    Tuple,
    Type,
)

from .services import Service
from .timers import timer_intervals
from .utils.futures import maybe_async, notify
from .utils.locks import Event

__all__ = [
    'QueuedMethod',
    'WorkerThread',
    'ServiceThread',
    'QueueServiceThread',
]


[docs]class QueuedMethod(NamedTuple): """Describe a method to be called by thread.""" promise: asyncio.Future method: Callable[..., Awaitable[Any]] args: Tuple[Any, ...] kwargs: Dict[str, Any]
[docs]class WorkerThread(threading.Thread): """Thread class used for services running in a dedicated thread.""" service: 'ServiceThread' _is_stopped: threading.Event def __init__(self, service: 'ServiceThread', **kwargs: Any) -> None: super().__init__(**kwargs) self.service = service self.daemon = False self._is_stopped = threading.Event()
[docs] def run(self) -> None: try: self.service._start_thread() finally: self._set_stopped()
def _set_stopped(self) -> None: try: self._is_stopped.set() except TypeError: # pragma: no cover # we lost the race at interpreter shutdown, # so gc collected built-in modules. pass
[docs] def stop(self) -> None: self._is_stopped.wait() if self.is_alive(): self.join(threading.TIMEOUT_MAX)
[docs]class ServiceThread(Service): """Service subclass running within a dedicated thread.""" Worker: Type[WorkerThread] = WorkerThread abstract = True wait_for_shutdown = True #: Set this to False if s.start() should not wait for the #: underlying thread to be fully started. wait_for_thread: bool = True _thread: Optional['WorkerThread'] = None _thread_started: Event _thread_running: Optional[asyncio.Future] = None def __init__(self, *, executor: Any = None, loop: asyncio.AbstractEventLoop = None, thread_loop: asyncio.AbstractEventLoop = None, Worker: Type[WorkerThread] = None, **kwargs: Any) -> None: # cannot share loop between threads, so create a new one assert asyncio.get_event_loop() if executor is not None: raise NotImplementedError('executor argument no longer supported') self.parent_loop = loop or asyncio.get_event_loop() self.thread_loop = thread_loop or asyncio.new_event_loop() self._thread_started = Event(loop=self.parent_loop) if Worker is not None: self.Worker = Worker super().__init__(loop=self.thread_loop, **kwargs) assert self._shutdown.loop is self.parent_loop async def on_thread_started(self) -> None: ... async def on_thread_stop(self) -> None: ... # The deal with asyncio.Event and threads. # # Every thread needs a dedicated event loop, but events can actually # be shared between threads in some ways: # # - Any thread can set/check the flag (.set() / .is_set()) # - Only the thread owning the loop can wait for the event # to be set (await .wait()) # So X(Service) adds dependency Y(ServiceThread) # We add a new _thread_started event owned by the parent loop. # # Original ._started event is owned by parent loop # # X calls await Y.start(): this starts a thread running Y._start_thread # Y starts the thread, and the thread calls super().start to start # the ServiceThread inside that thread. # After starting the thread will wait for _stopped to be set. # ._stopped is owned by thread loop # parent sets _stopped.set(), thread calls _stopped.wait() # and only wait needs the loop. # ._shutdown is owned by parent loop # thread calls _shutdown.set(), parent calls _shutdown.wait() def _new_shutdown_event(self) -> Event: return Event(loop=self.parent_loop) async def maybe_start(self) -> None: if not self._thread_started.is_set(): await self.start() async def start(self) -> None: # cannot await the future returned by run_in_executor, # as that would make us wait until the webserver exits. # Instead we add as Future dependency to this service, so that # it is stopped with `await service.stop()` assert not self._thread_started.is_set() self._thread_started.set() self._thread_running = asyncio.Future(loop=self.parent_loop) try: self._thread = self.Worker(self) self._thread.start() if not self.should_stop and self.wait_for_thread: # thread exceptions do not propagate to the main thread, # so we need some way to communicate socket open errors, # such as "port in use", back to the parent thread. # The _thread_running future is set to # an exception state when that happens, and awaiting will # propagate the error to the parent thread. # wait for thread to be fully started await self._thread_running finally: self._thread_running = None async def crash(self, exc: BaseException) -> None: if self._thread_running and not self._thread_running.done(): self._thread_running.set_exception(exc) # <- .start() will raise await super().crash(exc) def _start_thread(self) -> None: # set the default event loop for this thread asyncio.set_event_loop(self.thread_loop) try: self.thread_loop.run_until_complete(self._serve()) except Exception: # if self._serve raises an exception we need to set # shutdown here, since _shutdown_thread will not execute. self.set_shutdown() raise async def stop(self) -> None: if self._started.is_set(): await super().stop() async def _stop_children(self) -> None: ... # called by thread instead of .stop() async def _stop_futures(self) -> None: ... # called by thread instead of .stop() async def _stop_exit_stacks(self) -> None: ... # called by thread instead of .stop() async def _shutdown_thread(self) -> None: await self._default_stop_children() await self.on_thread_stop() self.set_shutdown() await self._default_stop_futures() if self._thread is not None: self._thread.stop() await self._default_stop_exit_stacks() async def _serve(self) -> None: try: # start the service await self._default_start() # allow ServiceThread.start() to return # when wait_for_thread is enabled. await self.on_thread_started() notify(self._thread_running) await self.wait_until_stopped() except asyncio.CancelledError: raise except BaseException as exc: # pylint: disable=broad-except self.on_crash('{0!r} crashed: {1!r}', self.label, exc) await self.crash(exc) if self.beacon.root is not None: await self.beacon.root.data.crash(exc) raise finally: await self._shutdown_thread() @Service.task async def _thread_keepalive(self) -> None: for sleep_time in timer_intervals(1.0, name='_thread_keepalive'): if self.should_stop: break # The consumer thread will have a separate event loop, # and so we use this trick to make sure our loop is # being scheduled to run something at all times. # # If we don't do this, anything waiting for new # stuff in the method queue may never get it. await asyncio.sleep(sleep_time, loop=self.thread_loop) if self.should_stop: break
[docs] def on_crash(self, msg: str, *fmt: Any, **kwargs: Any) -> None: print(msg.format(*fmt), file=sys.stderr) traceback.print_exc(None, sys.stderr)
class MethodQueueWorker(Service): index: int method_queue: 'MethodQueue' mundane_level = 'debug' def __init__(self, method_queue: 'MethodQueue', *, index: int, **kwargs: Any) -> None: self.method_queue = method_queue self.index = index super().__init__(**kwargs) @Service.task async def _method_queue_do_work(self) -> None: method_queue = self.method_queue queue_ready = method_queue._queue_ready wait = method_queue.wait get = method_queue._queue.get process_enqueued = method_queue._process_enqueued while not self.should_stop and not method_queue.should_stop: await wait(queue_ready) if not method_queue.should_stop: item = await get() await process_enqueued(item) class MethodQueue(Service): Worker: Type[MethodQueueWorker] = MethodQueueWorker _queue: asyncio.Queue _queue_ready: Event _workers: List[MethodQueueWorker] mundane_level = 'debug' def __init__(self, loop: asyncio.AbstractEventLoop, num_workers: int = 2, **kwargs: Any) -> None: super().__init__(**kwargs) self._queue = asyncio.Queue(loop=self.loop) self._queue_ready = Event(loop=self.loop) self.num_workers = num_workers self._workers = [] async def on_start(self) -> None: self._workers[:] = [ self.Worker(self, index=i, loop=self.loop, beacon=self.beacon) for i in range(self.num_workers) ] for worker in self._workers: await self.add_runtime_dependency(worker) async def on_stop(self) -> None: await self.flush() self._workers[:] = [] async def call(self, promise: asyncio.Future, fun: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> asyncio.Future: await self._queue.put(QueuedMethod(promise, fun, args, kwargs)) self._queue_ready.set() return promise async def cast(self, fun: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> None: promise = self.loop.create_future() await self._queue.put(QueuedMethod(promise, fun, args, kwargs)) self._queue_ready.set() async def flush(self) -> None: while 1: try: p = self._queue.get_nowait() except asyncio.QueueEmpty: break else: await self._process_enqueued(p) async def _process_enqueued(self, p: QueuedMethod) -> asyncio.Future: promise, method, args, kwargs = p if not promise.cancelled(): try: result = await maybe_async(method(*args, **kwargs)) except BaseException as exc: if not promise.cancelled(): promise.set_exception(exc) else: if not promise.cancelled(): promise.set_result(result) return promise
[docs]class QueueServiceThread(ServiceThread): """Service running in separate thread. Uses a queue to run functions inside the thread, so you can delegate calls. """ abstract = True _method_queue: Optional[MethodQueue] = None @property def method_queue(self) -> MethodQueue: if self._method_queue is None: self._method_queue = MethodQueue( loop=self.thread_loop, beacon=self.beacon, ) return self._method_queue async def on_thread_started(self) -> None: await self.method_queue.start() async def on_thread_stop(self) -> None: if self._method_queue is not None: await self._method_queue.stop() async def call_thread(self, fun: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> Any: # Enqueue method to be called by thread (synchronous). # We pass a future to the thread, so that when the call is done # the thread will call `future.set_result(result)`. promise = await self.method_queue.call( self.parent_loop.create_future(), fun, *args, **kwargs) # wait for the promise to be fulfilled result = await promise return result async def cast_thread(self, fun: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> None: # Enqueue method to be called by thread (asynchronous). await self.method_queue.cast(fun, *args, **kwargs)