Source code for asyncx_ros2.decorator

import asyncio
import functools
from typing import Any, Callable, cast

from asyncx._types import EventLoopSelector, TAsyncCallable

from .future import concurrent_to_ros_future


[docs]def wrap_as_ros_coroutine( loop_selector: EventLoopSelector, ) -> Callable[[TAsyncCallable], TAsyncCallable]: """A decorator to wrap an async function running on an asyncio event loop with a :class:`rclpy.task.Future`. Example: >>> @asyncx_ros2.wrap_as_ros_coroutine(get_event_loop) ... async def foo() -> None: ... await asyncio.sleep(1) ... print("Called!") ... >>> node.create_timer(0.5, foo) Args: loop_selector: An event loop on which a coroutine runs. The value must be either an event loop or a callable that returns an event loop. """ def deco(func: TAsyncCallable) -> TAsyncCallable: @functools.wraps(func) async def wrapper(*args: Any, **kwargs: Any) -> Any: target_loop: asyncio.AbstractEventLoop if callable(loop_selector): target_loop = loop_selector() else: target_loop = loop_selector coro = func(*args, **kwargs) f = asyncio.run_coroutine_threadsafe(coro, target_loop) return await concurrent_to_ros_future(f) return cast(TAsyncCallable, wrapper) return deco