import asyncio
import functools
from typing import Any, Callable, Coroutine, cast
from ._types import EventLoopSelector, TAsyncCallable, TReturn
[docs]def dispatch(
loop_selector: EventLoopSelector,
) -> Callable[[TAsyncCallable], TAsyncCallable]:
"""A decorator to dispatch an async function to another event loop.
Example:
>>> async def foo() -> None:
... return threading.get_ident()
...
>>> @asyncx.dispatch(get_event_loop)
... async def foo_dispatch() -> None:
... return threading.get_ident()
...
>>> current, dispatched = await asyncio.gather(
... foo(), foo_dispatch(),
... )
>>> current != dispatched
True
Args:
loop_selector:
Target event loop to which a coroutine is dispatched.
The value must be either an event loop or a callable that returns
an event loop.
"""
def deco(func: TAsyncCallable) -> TAsyncCallable:
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> TReturn:
target_loop: asyncio.AbstractEventLoop
if callable(loop_selector):
target_loop = loop_selector()
else:
target_loop = loop_selector
coro = func(*args, **kwargs)
return await dispatch_coroutine(
coro,
target_loop=target_loop,
)
return cast(TAsyncCallable, wrapper)
return deco
[docs]async def dispatch_coroutine(
coro: Coroutine[Any, Any, TReturn],
target_loop: asyncio.AbstractEventLoop,
) -> TReturn:
"""Execute the specified coroutine on the specified event loop.
Example:
>>> async def foo() -> None:
... return threading.get_ident()
...
>>> current, dispatched = await asyncio.gather(
... foo(),
... asyncx.dispatch_coroutine(foo(), other_loop),
... )
>>> current != dispatched
True
Args:
coro:
A coroutine to be dispatched.
target_loop:
An event loop to execute the ``coro``.
"""
caller_loop = asyncio.get_running_loop()
if target_loop == caller_loop:
return await coro
f = asyncio.run_coroutine_threadsafe(coro, target_loop)
return await asyncio.wrap_future(f, loop=caller_loop)