asyncx_ros2.wrap_as_ros_coroutine

asyncx_ros2.wrap_as_ros_coroutine(loop_selector)[source]

A decorator to wrap an async function running on an asyncio event loop with a rclpy.task.Future.

Example

>>> @asyncx_ros2.wrap_as_ros_coroutine(get_event_loop)
... async def foo() -> None:
...     await asyncio.sleep(1)
...     print("Called!")
...
>>> node.create_timer(0.5, foo)
Parameters

loop_selector (Union[asyncio.events.AbstractEventLoop, Callable[[], asyncio.events.AbstractEventLoop]]) – An event loop on which a coroutine runs. The value must be either an event loop or a callable that returns an event loop.

Return type

Callable[[asyncx._types.TAsyncCallable], asyncx._types.TAsyncCallable]