Skip to content

Commit d03eafa

Browse files
authored
Extend SubscriptionCallback trait to ReadOnlyLoanedMessage (#266)
This was not a straightforward extension because 1. ReadOnlyLoanedMessage has a lifetime 2. The take_loaned_message() method was restricted to Subscription<T: RmwMessage>, i.e. not available for subscriptions on idiomatic messages. This has been fixed and now the method is implemented for Subscription<T: Message> like the other `take*` methods – though you still will get an RMW-native loaned message even if T is the idiomatic message type, because idiomatic message types can never be loaned.
1 parent 2ed0e4e commit d03eafa

File tree

6 files changed

+150
-79
lines changed

6 files changed

+150
-79
lines changed

examples/minimal_pub_sub/src/zero_copy_subscriber.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ fn main() -> Result<(), Error> {
1212
let _subscription = node.create_subscription::<std_msgs::msg::UInt32, _>(
1313
"topic",
1414
rclrs::QOS_PROFILE_DEFAULT,
15-
move |msg: std_msgs::msg::UInt32| {
15+
move |msg: rclrs::ReadOnlyLoanedMessage<'_, std_msgs::msg::UInt32>| {
1616
num_messages += 1;
1717
println!("I heard: '{}'", msg.data);
1818
println!("(Got {} messages so far)", num_messages);

rclrs/src/publisher.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,12 +179,13 @@ where
179179
/// 1. Publishers and subscriptions are on the same machine
180180
/// 1. The message is a "plain old data" type containing no variable-size members, whether bounded or unbounded
181181
/// 1. The publisher's QOS settings are compatible with zero-copy, e.g. the [default QOS][2]
182-
/// 1. `Publisher::borrow_loaned_message()` and [`Subscription::take_loaned_message()`][1] are used
182+
/// 1. `Publisher::borrow_loaned_message()` is used and the subscription uses a callback taking a
183+
/// [`ReadOnlyLoanedMessage`][1]
183184
///
184185
/// This function is only implemented for [`RmwMessage`]s since the "idiomatic" message type
185186
/// does not have a typesupport library.
186187
///
187-
/// [1]: crate::Subscription::take_loaned_message
188+
/// [1]: crate::ReadOnlyLoanedMessage
188189
/// [2]: crate::QOS_PROFILE_DEFAULT
189190
//
190191
// TODO: Explain more, e.g.

rclrs/src/subscription.rs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -212,12 +212,7 @@ where
212212
};
213213
Ok(MessageInfo::from_rmw_message_info(&message_info))
214214
}
215-
}
216215

217-
impl<T> Subscription<T>
218-
where
219-
T: RmwMessage,
220-
{
221216
/// Obtains a read-only handle to a message owned by the middleware.
222217
///
223218
/// When there is no new message, this will return a
@@ -228,23 +223,28 @@ where
228223
///
229224
/// [1]: crate::RclrsError
230225
/// [2]: crate::Publisher::borrow_loaned_message
231-
pub fn take_loaned_message(&self) -> Result<ReadOnlyLoanedMessage<'_, T>, RclrsError> {
226+
pub fn take_loaned(&self) -> Result<(ReadOnlyLoanedMessage<'_, T>, MessageInfo), RclrsError> {
232227
let mut msg_ptr = std::ptr::null_mut();
228+
let mut message_info = unsafe { rmw_get_zero_initialized_message_info() };
233229
unsafe {
234230
// SAFETY: The third argument (message_info) and fourth argument (allocation) may be null.
235231
// The second argument (loaned_message) contains a null ptr as expected.
236232
rcl_take_loaned_message(
237233
&*self.handle.lock(),
238234
&mut msg_ptr,
239-
std::ptr::null_mut(),
235+
&mut message_info,
240236
std::ptr::null_mut(),
241237
)
242238
.ok()?;
243239
}
244-
Ok(ReadOnlyLoanedMessage {
245-
msg_ptr: msg_ptr as *const T,
240+
let read_only_loaned_msg = ReadOnlyLoanedMessage {
241+
msg_ptr: msg_ptr as *const T::RmwMsg,
246242
subscription: self,
247-
})
243+
};
244+
Ok((
245+
read_only_loaned_msg,
246+
MessageInfo::from_rmw_message_info(&message_info),
247+
))
248248
}
249249
}
250250

@@ -265,18 +265,26 @@ where
265265
let (msg, _) = self.take()?;
266266
cb(msg)
267267
}
268-
AnySubscriptionCallback::Boxed(cb) => {
269-
let (msg, _) = self.take_boxed()?;
270-
cb(msg)
271-
}
272268
AnySubscriptionCallback::RegularWithMessageInfo(cb) => {
273269
let (msg, msg_info) = self.take()?;
274270
cb(msg, msg_info)
275271
}
272+
AnySubscriptionCallback::Boxed(cb) => {
273+
let (msg, _) = self.take_boxed()?;
274+
cb(msg)
275+
}
276276
AnySubscriptionCallback::BoxedWithMessageInfo(cb) => {
277277
let (msg, msg_info) = self.take_boxed()?;
278278
cb(msg, msg_info)
279279
}
280+
AnySubscriptionCallback::Loaned(cb) => {
281+
let (msg, _) = self.take_loaned()?;
282+
cb(msg)
283+
}
284+
AnySubscriptionCallback::LoanedWithMessageInfo(cb) => {
285+
let (msg, msg_info) = self.take_loaned()?;
286+
cb(msg, msg_info)
287+
}
280288
}
281289
Ok(())
282290
})() {

rclrs/src/subscription/callback.rs

Lines changed: 72 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
use super::MessageInfo;
2+
use crate::ReadOnlyLoanedMessage;
3+
4+
use rosidl_runtime_rs::Message;
25

36
/// A trait for allowed callbacks for subscriptions.
47
///
58
/// See [`AnySubscriptionCallback`] for a list of possible callback signatures.
6-
pub trait SubscriptionCallback<T, Args>: Send {
9+
pub trait SubscriptionCallback<T, Args>: Send + 'static
10+
where
11+
T: Message,
12+
{
713
/// Converts the callback into an enum.
814
///
915
/// User code never needs to call this function.
@@ -13,7 +19,10 @@ pub trait SubscriptionCallback<T, Args>: Send {
1319
/// An enum capturing the various possible function signatures for subscription callbacks.
1420
///
1521
/// The correct enum variant is deduced by the [`SubscriptionCallback`] trait.
16-
pub enum AnySubscriptionCallback<T> {
22+
pub enum AnySubscriptionCallback<T>
23+
where
24+
T: Message,
25+
{
1726
/// A callback with only the message as an argument.
1827
Regular(Box<dyn FnMut(T) + Send>),
1928
/// A callback with the message and the message info as arguments.
@@ -22,92 +31,104 @@ pub enum AnySubscriptionCallback<T> {
2231
Boxed(Box<dyn FnMut(Box<T>) + Send>),
2332
/// A callback with the boxed message and the message info as arguments.
2433
BoxedWithMessageInfo(Box<dyn FnMut(Box<T>, MessageInfo) + Send>),
34+
/// A callback with only the loaned message as an argument.
35+
#[allow(clippy::type_complexity)]
36+
Loaned(Box<dyn for<'a> FnMut(ReadOnlyLoanedMessage<'a, T>) + Send>),
37+
/// A callback with the loaned message and the message info as arguments.
38+
#[allow(clippy::type_complexity)]
39+
LoanedWithMessageInfo(Box<dyn for<'a> FnMut(ReadOnlyLoanedMessage<'a, T>, MessageInfo) + Send>),
2540
}
2641

2742
// We need one implementation per arity. This was inspired by Bevy's systems.
2843
impl<T, A0, Func> SubscriptionCallback<T, (A0,)> for Func
2944
where
3045
Func: FnMut(A0) + Send + 'static,
31-
(A0,): ArgTuple<T, Func = Box<dyn FnMut(A0) + Send>>,
46+
(A0,): ArgTuple<T, Func>,
47+
T: Message,
3248
{
3349
fn into_callback(self) -> AnySubscriptionCallback<T> {
34-
<(A0,) as ArgTuple<T>>::into_callback_with_args(Box::new(self))
50+
<(A0,) as ArgTuple<T, Func>>::into_callback_with_args(self)
3551
}
3652
}
3753

3854
impl<T, A0, A1, Func> SubscriptionCallback<T, (A0, A1)> for Func
3955
where
4056
Func: FnMut(A0, A1) + Send + 'static,
41-
(A0, A1): ArgTuple<T, Func = Box<dyn FnMut(A0, A1) + Send>>,
57+
(A0, A1): ArgTuple<T, Func>,
58+
T: Message,
4259
{
4360
fn into_callback(self) -> AnySubscriptionCallback<T> {
44-
<(A0, A1) as ArgTuple<T>>::into_callback_with_args(Box::new(self))
61+
<(A0, A1) as ArgTuple<T, Func>>::into_callback_with_args(self)
4562
}
4663
}
4764

4865
// Helper trait for SubscriptionCallback.
4966
//
5067
// For each tuple of args, it provides conversion from a function with
5168
// these args to the correct enum variant.
52-
trait ArgTuple<T> {
53-
type Func;
54-
fn into_callback_with_args(func: Self::Func) -> AnySubscriptionCallback<T>;
69+
trait ArgTuple<T, Func>
70+
where
71+
T: Message,
72+
{
73+
fn into_callback_with_args(func: Func) -> AnySubscriptionCallback<T>;
5574
}
5675

57-
impl<T> ArgTuple<T> for (Box<T>,) {
58-
type Func = Box<dyn FnMut(Box<T>) + Send>;
59-
fn into_callback_with_args(func: Self::Func) -> AnySubscriptionCallback<T> {
60-
AnySubscriptionCallback::Boxed(func)
76+
impl<T, Func> ArgTuple<T, Func> for (T,)
77+
where
78+
T: Message,
79+
Func: FnMut(T) + Send + 'static,
80+
{
81+
fn into_callback_with_args(func: Func) -> AnySubscriptionCallback<T> {
82+
AnySubscriptionCallback::Regular(Box::new(func))
6183
}
6284
}
6385

64-
impl<T> ArgTuple<T> for (Box<T>, MessageInfo) {
65-
type Func = Box<dyn FnMut(Box<T>, MessageInfo) + Send>;
66-
fn into_callback_with_args(func: Self::Func) -> AnySubscriptionCallback<T> {
67-
AnySubscriptionCallback::BoxedWithMessageInfo(func)
86+
impl<T, Func> ArgTuple<T, Func> for (T, MessageInfo)
87+
where
88+
T: Message,
89+
Func: FnMut(T, MessageInfo) + Send + 'static,
90+
{
91+
fn into_callback_with_args(func: Func) -> AnySubscriptionCallback<T> {
92+
AnySubscriptionCallback::RegularWithMessageInfo(Box::new(func))
6893
}
6994
}
7095

71-
impl<T> ArgTuple<T> for (T,) {
72-
type Func = Box<dyn FnMut(T) + Send>;
73-
fn into_callback_with_args(func: Self::Func) -> AnySubscriptionCallback<T> {
74-
AnySubscriptionCallback::Regular(func)
96+
impl<T, Func> ArgTuple<T, Func> for (Box<T>,)
97+
where
98+
T: Message,
99+
Func: FnMut(Box<T>) + Send + 'static,
100+
{
101+
fn into_callback_with_args(func: Func) -> AnySubscriptionCallback<T> {
102+
AnySubscriptionCallback::Boxed(Box::new(func))
75103
}
76104
}
77105

78-
impl<T> ArgTuple<T> for (T, MessageInfo) {
79-
type Func = Box<dyn FnMut(T, MessageInfo) + Send>;
80-
fn into_callback_with_args(func: Self::Func) -> AnySubscriptionCallback<T> {
81-
AnySubscriptionCallback::RegularWithMessageInfo(func)
106+
impl<T, Func> ArgTuple<T, Func> for (Box<T>, MessageInfo)
107+
where
108+
T: Message,
109+
Func: FnMut(Box<T>, MessageInfo) + Send + 'static,
110+
{
111+
fn into_callback_with_args(func: Func) -> AnySubscriptionCallback<T> {
112+
AnySubscriptionCallback::BoxedWithMessageInfo(Box::new(func))
82113
}
83114
}
84115

85-
#[cfg(test)]
86-
mod tests {
87-
use super::*;
116+
impl<T, Func> ArgTuple<T, Func> for (ReadOnlyLoanedMessage<'_, T>,)
117+
where
118+
T: Message,
119+
Func: for<'b> FnMut(ReadOnlyLoanedMessage<'b, T>) + Send + 'static,
120+
{
121+
fn into_callback_with_args(func: Func) -> AnySubscriptionCallback<T> {
122+
AnySubscriptionCallback::Loaned(Box::new(func))
123+
}
124+
}
88125

89-
#[test]
90-
fn callback_conversion() {
91-
struct Message;
92-
let cb = |_msg: Message| {};
93-
assert!(matches!(
94-
cb.into_callback(),
95-
AnySubscriptionCallback::<Message>::Regular(_)
96-
));
97-
let cb = |_msg: Message, _info: MessageInfo| {};
98-
assert!(matches!(
99-
cb.into_callback(),
100-
AnySubscriptionCallback::<Message>::RegularWithMessageInfo(_)
101-
));
102-
let cb = |_msg: Box<Message>| {};
103-
assert!(matches!(
104-
cb.into_callback(),
105-
AnySubscriptionCallback::<Message>::Boxed(_)
106-
));
107-
let cb = |_msg: Box<Message>, _info: MessageInfo| {};
108-
assert!(matches!(
109-
cb.into_callback(),
110-
AnySubscriptionCallback::<Message>::BoxedWithMessageInfo(_)
111-
));
126+
impl<T, Func> ArgTuple<T, Func> for (ReadOnlyLoanedMessage<'_, T>, MessageInfo)
127+
where
128+
T: Message,
129+
Func: for<'b> FnMut(ReadOnlyLoanedMessage<'b, T>, MessageInfo) + Send + 'static,
130+
{
131+
fn into_callback_with_args(func: Func) -> AnySubscriptionCallback<T> {
132+
AnySubscriptionCallback::LoanedWithMessageInfo(Box::new(func))
112133
}
113134
}
Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,41 @@
11
use crate::rcl_bindings::*;
22
use crate::{Subscription, ToResult};
33

4-
use rosidl_runtime_rs::RmwMessage;
4+
use rosidl_runtime_rs::Message;
55

66
use std::ops::Deref;
77

88
/// A message that is owned by the middleware, loaned out for reading.
99
///
10-
/// It dereferences to a `&T`.
10+
/// It dereferences to a `&T::RmwMsg`. That is, if `T` is already an RMW-native
11+
/// message, it's the same as `&T`, and otherwise it's the corresponding RMW-native
12+
/// message.
1113
///
12-
/// This type is returned by [`Subscription::take_loaned_message()`].
14+
/// This type is returned by [`Subscription::take_loaned()`] and may be used in
15+
/// subscription callbacks.
1316
///
1417
/// The loan is returned by dropping the `ReadOnlyLoanedMessage`.
1518
pub struct ReadOnlyLoanedMessage<'a, T>
1619
where
17-
T: RmwMessage,
20+
T: Message,
1821
{
19-
pub(super) msg_ptr: *const T,
22+
pub(super) msg_ptr: *const T::RmwMsg,
2023
pub(super) subscription: &'a Subscription<T>,
2124
}
2225

2326
impl<'a, T> Deref for ReadOnlyLoanedMessage<'a, T>
2427
where
25-
T: RmwMessage,
28+
T: Message,
2629
{
27-
type Target = T;
30+
type Target = T::RmwMsg;
2831
fn deref(&self) -> &Self::Target {
2932
unsafe { &*self.msg_ptr }
3033
}
3134
}
3235

3336
impl<'a, T> Drop for ReadOnlyLoanedMessage<'a, T>
3437
where
35-
T: RmwMessage,
38+
T: Message,
3639
{
3740
fn drop(&mut self) {
3841
unsafe {
@@ -48,6 +51,6 @@ where
4851

4952
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
5053
// they are running in. Therefore, this type can be safely sent to another thread.
51-
unsafe impl<'a, T> Send for ReadOnlyLoanedMessage<'a, T> where T: RmwMessage {}
54+
unsafe impl<'a, T> Send for ReadOnlyLoanedMessage<'a, T> where T: Message {}
5255
// SAFETY: This type has no interior mutability, in fact it has no mutability at all.
53-
unsafe impl<'a, T> Sync for ReadOnlyLoanedMessage<'a, T> where T: RmwMessage {}
56+
unsafe impl<'a, T> Sync for ReadOnlyLoanedMessage<'a, T> where T: Message {}

rclrs_tests/src/pub_sub_tests.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use rclrs::{LoanedMessage, Publisher, ReadOnlyLoanedMessage, Subscription};
1+
use rclrs::{
2+
AnySubscriptionCallback, LoanedMessage, MessageInfo, Publisher, ReadOnlyLoanedMessage,
3+
Subscription, SubscriptionCallback,
4+
};
25

36
fn assert_send<T: Send>() {}
47
fn assert_sync<T: Sync>() {}
@@ -26,3 +29,38 @@ fn readonly_loaned_message_is_send_and_sync() {
2629
assert_send::<ReadOnlyLoanedMessage<test_msgs::msg::rmw::BoundedSequences>>();
2730
assert_sync::<ReadOnlyLoanedMessage<test_msgs::msg::rmw::BoundedSequences>>();
2831
}
32+
33+
#[test]
34+
fn callback_conversion() {
35+
type Message = test_msgs::msg::BoundedSequences;
36+
let cb = |_msg: Message| {};
37+
assert!(matches!(
38+
cb.into_callback(),
39+
AnySubscriptionCallback::<Message>::Regular(_)
40+
));
41+
let cb = |_msg: Message, _info: MessageInfo| {};
42+
assert!(matches!(
43+
cb.into_callback(),
44+
AnySubscriptionCallback::<Message>::RegularWithMessageInfo(_)
45+
));
46+
let cb = |_msg: Box<Message>| {};
47+
assert!(matches!(
48+
cb.into_callback(),
49+
AnySubscriptionCallback::<Message>::Boxed(_)
50+
));
51+
let cb = |_msg: Box<Message>, _info: MessageInfo| {};
52+
assert!(matches!(
53+
cb.into_callback(),
54+
AnySubscriptionCallback::<Message>::BoxedWithMessageInfo(_)
55+
));
56+
let cb = |_msg: ReadOnlyLoanedMessage<'_, Message>| {};
57+
assert!(matches!(
58+
cb.into_callback(),
59+
AnySubscriptionCallback::<Message>::Loaned(_)
60+
));
61+
let cb = |_msg: ReadOnlyLoanedMessage<'_, Message>, _info: MessageInfo| {};
62+
assert!(matches!(
63+
cb.into_callback(),
64+
AnySubscriptionCallback::<Message>::LoanedWithMessageInfo(_)
65+
));
66+
}

0 commit comments

Comments
 (0)