from __future__ import annotations
import asyncio
import concurrent.futures
import threading
from typing import Any, Coroutine, Optional, TypeVar
TReturn = TypeVar("TReturn")
TSelf = TypeVar("TSelf", bound="EventLoopThread")
[docs]class EventLoopThread(threading.Thread):
"""An event loop thread that provides thread-safe utility functions.
.. note::
The thread cannot be started more than once because of the constraints
of :class:`threading.Thread`. The class raises :class:`RuntimeError` if
it is already terminated.
Example:
>>> async def _get_ident() -> int:
... return threading.get_ident()
...
>>> thread = EventLoopThread()
>>> with thread:
... main, sub = await asyncio.gather(
... _get_ident(),
... thread.run_coroutine(_get_ident()),
... )
>>> main == sub
False
"""
[docs] def __init__(
self,
loop_policy: Optional[asyncio.AbstractEventLoopPolicy] = None,
daemon: bool = False,
start: bool = False,
) -> None:
"""Creates a new event loop thread.
Args:
loop_policy:
A :obj:`asyncio.AbstractEventLoopPolicy` object to be used
for creating a new event loop. If :obj:`None` is specified,
``asyncio.get_event_loop_policy()`` is used to get the policy.
daemon:
If ``True`` is specified, the thread is created with ``daemon=True``.
Refer to `threading.Thread.daemon`_ for further details.
.. _threading.Thread.daemon:
https://docs.python.org/3/library/threading.html#threading.Thread.daemon
start:
If ``True`` is specified, the thread is started immediately.
"""
self._loop_policy = loop_policy
self._lock = threading.Lock()
self._future: concurrent.futures.Future[None] = concurrent.futures.Future()
self._loop: Optional[asyncio.AbstractEventLoop] = None
super().__init__(target=self._target_impl, daemon=daemon)
if start:
self.start()
def _target_impl(self) -> None:
future = self._future
loop: asyncio.AbstractEventLoop
try:
loop_policy = self._loop_policy
if loop_policy is None:
loop_policy = asyncio.get_event_loop_policy()
loop = loop_policy.new_event_loop()
except Exception as ex:
future.set_exception(ex)
try:
self._loop = loop
future.set_result(None)
asyncio.set_event_loop(loop)
loop.run_forever()
finally:
self._loop = None
[docs] def get_loop(self) -> asyncio.AbstractEventLoop:
"""Get an event loop of the running thread.
The thread must be running. The method will raise :class:`RuntimeError`
if the thread is not running.
See also:
Use :attr:`EventLoopThread.loop` if you need an accessor to the loop.
"""
loop = self._loop
if loop is None:
raise RuntimeError("Thread is not running")
return loop
@property
def loop(self) -> asyncio.AbstractEventLoop:
"""Get an event loop of the running thread.
The property just returns the return value of :func:`EventLoopThread.get_loop`.
"""
return self.get_loop()
[docs] def start(self) -> None:
"""Start the thread and wait for a new event loop to be ready.
If the thread is already started, it returns immediately.
Raises:
RuntimeError:
If the thread is already terminated.
"""
with self._lock:
if self.is_alive():
return
if self._future.done():
raise RuntimeError("threads can only be started once")
super().start()
# Wait until loop is created
self._future.result()
[docs] def shutdown(self, join: bool = True) -> None:
"""Shutdown the running event loop to terminate the thread.
If the event loop is not running, it returns immediately.
Args:
join:
If `True` is specified, the method waits for the thread to be terminated.
Raises:
RumtimeError:
The method raises a :class:`RuntimeError` if ``join = True`` and the method
is called by the same thread as ``self.loop`` in order to avoid a deadlock.
See `threading.Thread.join`_ for further details.
.. _threading.Thread.join:
https://docs.python.org/3/library/threading.html#threading.Thread.join
"""
loop = self._loop
if loop is None:
return
running_loop = loop
assert isinstance(running_loop, asyncio.AbstractEventLoop)
running_loop.call_soon_threadsafe(lambda: running_loop.stop())
if join:
self.join()
def __enter__(self: TSelf) -> TSelf:
"""Initialize the event loop if it is not started."""
self.start()
return self
def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
"""Shutdown the event loop if it is running."""
self.shutdown()
[docs] def run_coroutine_concurrent(
self, coro: Coroutine[Any, Any, TReturn]
) -> concurrent.futures.Future[TReturn]:
"""Submit a coroutine in the event loop.
Args:
coro: A `Coroutine` object to run.
Returns:
A :class:`concurrent.future.Future` object that returns the execution result of
a given coroutine.
"""
loop = self.loop
return asyncio.run_coroutine_threadsafe(coro, loop)
[docs] def run_coroutine(
self,
coro: Coroutine[Any, Any, TReturn],
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> asyncio.Future[TReturn]:
"""Submit a coroutine in a new thread and waits for its completion in a given ``loop``.
Args:
coro: A `Coroutine` object to run.
loop: An event loop to wait for the completion of ``coro``.
Returns:
A :class:`asyncio.Future` object that returns the execution result of a given
coroutine.
"""
future = self.run_coroutine_concurrent(coro)
return asyncio.wrap_future(future, loop=loop)
[docs]def run_coroutine_in_thread(
coro: Coroutine[Any, Any, TReturn],
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> asyncio.Future[TReturn]:
"""Submit a coroutine in a new thread and waits for its completion in a given ``loop``.
Example:
>>> async def _get_ident() -> int:
... return threading.get_ident()
...
>>> main, sub = await asyncio.gather(
... _get_ident(),
... asyncx.run_coroutine_in_thread(_get_ident()),
... )
>>> main == sub
False
Args:
coro: A coroutine to run in a new thread.
loop: An event loop to wait for the completion of ``coro``.
Returns:
A :class:`asyncio.Future` object that returns the execution result of
a given coroutine.
"""
thread = EventLoopThread(start=True)
async def impl() -> TReturn:
try:
return await coro
finally:
thread.shutdown(join=False)
return thread.run_coroutine(impl(), loop=loop)