Skip to content

Commit 623c112

Browse files
authored
Generalize callbacks for subscriptions (#260)
* Generalize callbacks to subscriptions By implementing a trait (SubscriptionCallback) on valid function signatures, and making subscriptions accept callbacks that implement this trait, we can now * Receive both plain and boxed messages * Optionally receive a MessageInfo along with the message, as the second argument * Soon, receive a loaned message instead of an owned one This corresponds to the functionality in any_subscription_callback.hpp in rclcpp.
1 parent ec85565 commit 623c112

File tree

6 files changed

+350
-41
lines changed

6 files changed

+350
-41
lines changed

rclrs/src/node.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ pub use self::graph::*;
66
use crate::rcl_bindings::*;
77
use crate::{
88
Client, ClientBase, Context, ParameterOverrideMap, Publisher, QoSProfile, RclrsError, Service,
9-
ServiceBase, Subscription, SubscriptionBase, ToResult,
9+
ServiceBase, Subscription, SubscriptionBase, SubscriptionCallback, ToResult,
1010
};
1111

1212
use std::cmp::PartialEq;
@@ -227,15 +227,14 @@ impl Node {
227227
///
228228
/// [1]: crate::Subscription
229229
// TODO: make subscription's lifetime depend on node's lifetime
230-
pub fn create_subscription<T, F>(
230+
pub fn create_subscription<T, Args>(
231231
&mut self,
232232
topic: &str,
233233
qos: QoSProfile,
234-
callback: F,
234+
callback: impl SubscriptionCallback<T, Args>,
235235
) -> Result<Arc<Subscription<T>>, RclrsError>
236236
where
237237
T: Message,
238-
F: FnMut(T) + 'static + Send,
239238
{
240239
let subscription = Arc::new(Subscription::<T>::new(self, topic, qos, callback)?);
241240
self.subscriptions

rclrs/src/subscription.rs

Lines changed: 60 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use crate::qos::QoSProfile;
33
use crate::Node;
44
use crate::{rcl_bindings::*, RclrsError};
55

6-
use std::boxed::Box;
76
use std::ffi::CStr;
87
use std::ffi::CString;
98
use std::marker::PhantomData;
@@ -12,7 +11,11 @@ use std::sync::{Arc, Mutex, MutexGuard};
1211

1312
use rosidl_runtime_rs::{Message, RmwMessage};
1413

14+
mod callback;
15+
mod message_info;
1516
mod readonly_loaned_message;
17+
pub use callback::*;
18+
pub use message_info::*;
1619
pub use readonly_loaned_message::*;
1720

1821
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
@@ -71,7 +74,7 @@ where
7174
{
7275
pub(crate) handle: Arc<SubscriptionHandle>,
7376
/// The callback function that runs when a message was received.
74-
pub callback: Mutex<Box<dyn FnMut(T) + 'static + Send>>,
77+
pub callback: Mutex<AnySubscriptionCallback<T>>,
7578
message: PhantomData<T>,
7679
}
7780

@@ -80,17 +83,16 @@ where
8083
T: Message,
8184
{
8285
/// Creates a new subscription.
83-
pub(crate) fn new<F>(
86+
pub(crate) fn new<Args>(
8487
node: &Node,
8588
topic: &str,
8689
qos: QoSProfile,
87-
callback: F,
90+
callback: impl SubscriptionCallback<T, Args>,
8891
) -> Result<Self, RclrsError>
8992
// This uses pub(crate) visibility to avoid instantiating this struct outside
9093
// [`Node::create_subscription`], see the struct's documentation for the rationale
9194
where
9295
T: Message,
93-
F: FnMut(T) + 'static + Send,
9496
{
9597
// SAFETY: Getting a zero-initialized value is always safe.
9698
let mut rcl_subscription = unsafe { rcl_get_zero_initialized_subscription() };
@@ -129,7 +131,7 @@ where
129131

130132
Ok(Self {
131133
handle,
132-
callback: Mutex::new(Box::new(callback)),
134+
callback: Mutex::new(callback.into_callback()),
133135
message: PhantomData,
134136
})
135137
}
@@ -171,22 +173,44 @@ where
171173
// | rmw_take |
172174
// +-------------+
173175
// ```
174-
pub fn take(&self) -> Result<T, RclrsError> {
176+
pub fn take(&self) -> Result<(T, MessageInfo), RclrsError> {
175177
let mut rmw_message = <T as Message>::RmwMsg::default();
178+
let message_info = self.take_inner(&mut rmw_message)?;
179+
Ok((T::from_rmw_message(rmw_message), message_info))
180+
}
181+
182+
/// This is a version of take() that returns a boxed message.
183+
///
184+
/// This can be more efficient for messages containing large arrays.
185+
pub fn take_boxed(&self) -> Result<(Box<T>, MessageInfo), RclrsError> {
186+
let mut rmw_message = Box::new(<T as Message>::RmwMsg::default());
187+
let message_info = self.take_inner(&mut *rmw_message)?;
188+
// TODO: This will still use the stack in general. Change signature of
189+
// from_rmw_message to allow placing the result in a Box directly.
190+
let message = Box::new(T::from_rmw_message(*rmw_message));
191+
Ok((message, message_info))
192+
}
193+
194+
// Inner function, to be used by both regular and boxed versions.
195+
fn take_inner(
196+
&self,
197+
rmw_message: &mut <T as Message>::RmwMsg,
198+
) -> Result<MessageInfo, RclrsError> {
199+
let mut message_info = unsafe { rmw_get_zero_initialized_message_info() };
176200
let rcl_subscription = &mut *self.handle.lock();
177201
unsafe {
178202
// SAFETY: The first two pointers are valid/initialized, and do not need to be valid
179203
// beyond the function call.
180204
// The latter two pointers are explicitly allowed to be NULL.
181205
rcl_take(
182206
rcl_subscription,
183-
&mut rmw_message as *mut <T as Message>::RmwMsg as *mut _,
184-
std::ptr::null_mut(),
207+
rmw_message as *mut <T as Message>::RmwMsg as *mut _,
208+
&mut message_info,
185209
std::ptr::null_mut(),
186210
)
187211
.ok()?
188212
};
189-
Ok(T::from_rmw_message(rmw_message))
213+
Ok(MessageInfo::from_rmw_message_info(&message_info))
190214
}
191215
}
192216

@@ -233,19 +257,38 @@ where
233257
}
234258

235259
fn execute(&self) -> Result<(), RclrsError> {
236-
let msg = match self.take() {
237-
Ok(msg) => msg,
260+
// Immediately evaluated closure, to handle SubscriptionTakeFailed
261+
// outside this match
262+
match (|| {
263+
match &mut *self.callback.lock().unwrap() {
264+
AnySubscriptionCallback::Regular(cb) => {
265+
let (msg, _) = self.take()?;
266+
cb(msg)
267+
}
268+
AnySubscriptionCallback::Boxed(cb) => {
269+
let (msg, _) = self.take_boxed()?;
270+
cb(msg)
271+
}
272+
AnySubscriptionCallback::RegularWithMessageInfo(cb) => {
273+
let (msg, msg_info) = self.take()?;
274+
cb(msg, msg_info)
275+
}
276+
AnySubscriptionCallback::BoxedWithMessageInfo(cb) => {
277+
let (msg, msg_info) = self.take_boxed()?;
278+
cb(msg, msg_info)
279+
}
280+
}
281+
Ok(())
282+
})() {
238283
Err(RclrsError::RclError {
239284
code: RclReturnCode::SubscriptionTakeFailed,
240285
..
241286
}) => {
242287
// Spurious wakeup – this may happen even when a waitset indicated that this
243288
// subscription was ready, so it shouldn't be an error.
244-
return Ok(());
289+
Ok(())
245290
}
246-
Err(e) => return Err(e),
247-
};
248-
(*self.callback.lock().unwrap())(msg);
249-
Ok(())
291+
other => other,
292+
}
250293
}
251294
}

rclrs/src/subscription/callback.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
use super::MessageInfo;
2+
3+
/// A trait for allowed callbacks for subscriptions.
4+
///
5+
/// See [`AnySubscriptionCallback`] for a list of possible callback signatures.
6+
pub trait SubscriptionCallback<T, Args>: Send {
7+
/// Converts the callback into an enum.
8+
///
9+
/// User code never needs to call this function.
10+
fn into_callback(self) -> AnySubscriptionCallback<T>;
11+
}
12+
13+
/// An enum capturing the various possible function signatures for subscription callbacks.
14+
///
15+
/// The correct enum variant is deduced by the [`SubscriptionCallback`] trait.
16+
pub enum AnySubscriptionCallback<T> {
17+
/// A callback with only the message as an argument.
18+
Regular(Box<dyn FnMut(T) + Send>),
19+
/// A callback with the message and the message info as arguments.
20+
RegularWithMessageInfo(Box<dyn FnMut(T, MessageInfo) + Send>),
21+
/// A callback with only the boxed message as an argument.
22+
Boxed(Box<dyn FnMut(Box<T>) + Send>),
23+
/// A callback with the boxed message and the message info as arguments.
24+
BoxedWithMessageInfo(Box<dyn FnMut(Box<T>, MessageInfo) + Send>),
25+
}
26+
27+
// We need one implementation per arity. This was inspired by Bevy's systems.
28+
impl<T, A0, Func> SubscriptionCallback<T, (A0,)> for Func
29+
where
30+
Func: FnMut(A0) + Send + 'static,
31+
(A0,): ArgTuple<T, Func = Box<dyn FnMut(A0) + Send>>,
32+
{
33+
fn into_callback(self) -> AnySubscriptionCallback<T> {
34+
<(A0,) as ArgTuple<T>>::into_callback_with_args(Box::new(self))
35+
}
36+
}
37+
38+
impl<T, A0, A1, Func> SubscriptionCallback<T, (A0, A1)> for Func
39+
where
40+
Func: FnMut(A0, A1) + Send + 'static,
41+
(A0, A1): ArgTuple<T, Func = Box<dyn FnMut(A0, A1) + Send>>,
42+
{
43+
fn into_callback(self) -> AnySubscriptionCallback<T> {
44+
<(A0, A1) as ArgTuple<T>>::into_callback_with_args(Box::new(self))
45+
}
46+
}
47+
48+
// Helper trait for SubscriptionCallback.
49+
//
50+
// For each tuple of args, it provides conversion from a function with
51+
// these args to the correct enum variant.
52+
trait ArgTuple<T> {
53+
type Func;
54+
fn into_callback_with_args(func: Self::Func) -> AnySubscriptionCallback<T>;
55+
}
56+
57+
impl<T> ArgTuple<T> for (Box<T>,) {
58+
type Func = Box<dyn FnMut(Box<T>) + Send>;
59+
fn into_callback_with_args(func: Self::Func) -> AnySubscriptionCallback<T> {
60+
AnySubscriptionCallback::Boxed(func)
61+
}
62+
}
63+
64+
impl<T> ArgTuple<T> for (Box<T>, MessageInfo) {
65+
type Func = Box<dyn FnMut(Box<T>, MessageInfo) + Send>;
66+
fn into_callback_with_args(func: Self::Func) -> AnySubscriptionCallback<T> {
67+
AnySubscriptionCallback::BoxedWithMessageInfo(func)
68+
}
69+
}
70+
71+
impl<T> ArgTuple<T> for (T,) {
72+
type Func = Box<dyn FnMut(T) + Send>;
73+
fn into_callback_with_args(func: Self::Func) -> AnySubscriptionCallback<T> {
74+
AnySubscriptionCallback::Regular(func)
75+
}
76+
}
77+
78+
impl<T> ArgTuple<T> for (T, MessageInfo) {
79+
type Func = Box<dyn FnMut(T, MessageInfo) + Send>;
80+
fn into_callback_with_args(func: Self::Func) -> AnySubscriptionCallback<T> {
81+
AnySubscriptionCallback::RegularWithMessageInfo(func)
82+
}
83+
}
84+
85+
#[cfg(test)]
86+
mod tests {
87+
use super::*;
88+
89+
#[test]
90+
fn callback_conversion() {
91+
struct Message;
92+
let cb = |_msg: Message| {};
93+
assert!(matches!(
94+
cb.into_callback(),
95+
AnySubscriptionCallback::<Message>::Regular(_)
96+
));
97+
let cb = |_msg: Message, _info: MessageInfo| {};
98+
assert!(matches!(
99+
cb.into_callback(),
100+
AnySubscriptionCallback::<Message>::RegularWithMessageInfo(_)
101+
));
102+
let cb = |_msg: Box<Message>| {};
103+
assert!(matches!(
104+
cb.into_callback(),
105+
AnySubscriptionCallback::<Message>::Boxed(_)
106+
));
107+
let cb = |_msg: Box<Message>, _info: MessageInfo| {};
108+
assert!(matches!(
109+
cb.into_callback(),
110+
AnySubscriptionCallback::<Message>::BoxedWithMessageInfo(_)
111+
));
112+
}
113+
}

0 commit comments

Comments
 (0)