Skip to content

Commit 1c07227

Browse files
authored
Add support for loaned messages (#212)
1 parent f571735 commit 1c07227

File tree

9 files changed

+288
-3
lines changed

9 files changed

+288
-3
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ Features and limitations
1717
The current set of features include:
1818
- Message generation
1919
- Support for publishers and subscriptions
20+
- Loaned messages (zero-copy)
2021
- Tunable QoS settings
2122
- Clients and services
2223

examples/minimal_pub_sub/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,14 @@ path = "src/minimal_subscriber.rs"
1313
name = "minimal_publisher"
1414
path = "src/minimal_publisher.rs"
1515

16+
[[bin]]
17+
name = "zero_copy_subscriber"
18+
path = "src/zero_copy_subscriber.rs"
19+
20+
[[bin]]
21+
name = "zero_copy_publisher"
22+
path = "src/zero_copy_publisher.rs"
23+
1624
[dependencies]
1725
anyhow = {version = "1", features = ["backtrace"]}
1826

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use anyhow::{Error, Result};
2+
use std::env;
3+
4+
fn main() -> Result<(), Error> {
5+
let context = rclrs::Context::new(env::args())?;
6+
7+
let node = rclrs::create_node(&context, "minimal_publisher")?;
8+
9+
let publisher =
10+
node.create_publisher::<std_msgs::msg::rmw::UInt32>("topic", rclrs::QOS_PROFILE_DEFAULT)?;
11+
12+
let mut publish_count: u32 = 1;
13+
14+
while context.ok() {
15+
let mut message = publisher.borrow_loaned_message()?;
16+
message.data = publish_count;
17+
println!("Publishing: {}", message.data);
18+
message.publish()?;
19+
publish_count += 1;
20+
std::thread::sleep(std::time::Duration::from_millis(500));
21+
}
22+
Ok(())
23+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use std::env;
2+
3+
use anyhow::{Error, Result};
4+
5+
fn main() -> Result<(), Error> {
6+
let context = rclrs::Context::new(env::args())?;
7+
8+
let mut node = rclrs::create_node(&context, "minimal_subscriber")?;
9+
10+
let mut num_messages: usize = 0;
11+
12+
let _subscription = node.create_subscription::<std_msgs::msg::UInt32, _>(
13+
"topic",
14+
rclrs::QOS_PROFILE_DEFAULT,
15+
move |msg: std_msgs::msg::UInt32| {
16+
num_messages += 1;
17+
println!("I heard: '{}'", msg.data);
18+
println!("(Got {} messages so far)", num_messages);
19+
},
20+
)?;
21+
22+
rclrs::spin(&node).map_err(|err| err.into())
23+
}

rclrs/src/node/publisher.rs

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ use std::sync::{Arc, Mutex};
1111

1212
use rosidl_runtime_rs::{Message, RmwMessage};
1313

14+
mod loaned_message;
15+
pub use loaned_message::*;
16+
1417
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
1518
// they are running in. Therefore, this type can be safely sent to another thread.
1619
unsafe impl Send for rcl_publisher_t {}
@@ -31,6 +34,9 @@ where
3134
{
3235
rcl_publisher_mtx: Mutex<rcl_publisher_t>,
3336
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
37+
// The data pointed to by type_support_ptr has static lifetime;
38+
// it is global data in the type support library.
39+
type_support_ptr: *const rosidl_message_type_support_t,
3440
message: PhantomData<T>,
3541
}
3642

@@ -62,7 +68,7 @@ where
6268
{
6369
// SAFETY: Getting a zero-initialized value is always safe.
6470
let mut rcl_publisher = unsafe { rcl_get_zero_initialized_publisher() };
65-
let type_support =
71+
let type_support_ptr =
6672
<T as Message>::RmwMsg::get_type_support() as *const rosidl_message_type_support_t;
6773
let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul {
6874
err,
@@ -82,7 +88,7 @@ where
8288
rcl_publisher_init(
8389
&mut rcl_publisher,
8490
rcl_node,
85-
type_support,
91+
type_support_ptr,
8692
topic_c_string.as_ptr(),
8793
&publisher_options,
8894
)
@@ -92,6 +98,7 @@ where
9298
Ok(Self {
9399
rcl_publisher_mtx: Mutex::new(rcl_publisher),
94100
rcl_node_mtx: Arc::clone(&node.rcl_node_mtx),
101+
type_support_ptr,
95102
message: PhantomData,
96103
})
97104
}
@@ -145,6 +152,57 @@ where
145152
}
146153
}
147154

155+
impl<T> Publisher<T>
156+
where
157+
T: RmwMessage,
158+
{
159+
/// Obtains a writable handle to a message owned by the middleware.
160+
///
161+
/// This lets the middleware control how and where to allocate memory for the
162+
/// message.
163+
/// The purpose of this is typically to achieve *zero-copy communication* between publishers and
164+
/// subscriptions on the same machine: the message is placed directly in a shared memory region,
165+
/// and a reference to the same memory is returned by [`Subscription::take_loaned_message()`][1].
166+
/// No copying or serialization/deserialization takes place, which is much more efficient,
167+
/// especially as the message size grows.
168+
///
169+
/// # Conditions for zero-copy communication
170+
/// 1. A middleware with support for shared memory is used, e.g. `CycloneDDS` with `iceoryx`
171+
/// 1. Shared memory transport is enabled in the middleware configuration
172+
/// 1. Publishers and subscriptions are on the same machine
173+
/// 1. The message is a "plain old data" type containing no variable-size members, whether bounded or unbounded
174+
/// 1. The publisher's QOS settings are compatible with zero-copy, e.g. the [default QOS][2]
175+
/// 1. `Publisher::borrow_loaned_message()` and [`Subscription::take_loaned_message()`][1] are used
176+
///
177+
/// This function is only implemented for [`RmwMessage`]s since the "idiomatic" message type
178+
/// does not have a typesupport library.
179+
///
180+
/// [1]: crate::Subscription::take_loaned_message
181+
/// [2]: crate::QOS_PROFILE_DEFAULT
182+
//
183+
// TODO: Explain more, e.g.
184+
// - Zero-copy communication between rclcpp and rclrs possible?
185+
// - What errors occur when?
186+
// - What happens when only *some* subscribers are local?
187+
// - What QOS settings are required exactly? https://cyclonedds.io/docs/cyclonedds/latest/shared_memory.html
188+
pub fn borrow_loaned_message(&self) -> Result<LoanedMessage<'_, T>, RclrsError> {
189+
let mut msg_ptr = std::ptr::null_mut();
190+
unsafe {
191+
// SAFETY: msg_ptr contains a null ptr as expected by this function.
192+
rcl_borrow_loaned_message(
193+
&*self.rcl_publisher_mtx.lock().unwrap(),
194+
self.type_support_ptr,
195+
&mut msg_ptr,
196+
)
197+
.ok()?;
198+
}
199+
Ok(LoanedMessage {
200+
publisher: self,
201+
msg_ptr: msg_ptr as *mut T,
202+
})
203+
}
204+
}
205+
148206
/// Convenience trait for [`Publisher::publish`].
149207
pub trait MessageCow<'a, T: Message> {
150208
/// Wrap the owned or borrowed message in a `Cow`.
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
use crate::rcl_bindings::*;
2+
use crate::{Publisher, RclrsError, ToResult};
3+
4+
use rosidl_runtime_rs::RmwMessage;
5+
6+
use std::ops::{Deref, DerefMut};
7+
8+
/// A message that is owned by the middleware, loaned for publishing.
9+
///
10+
/// It dereferences to a `&mut T`.
11+
///
12+
/// This type is returned by [`Publisher::borrow_loaned_message()`], see the documentation of
13+
/// that function for more information.
14+
///
15+
/// The loan is returned by dropping the message or [publishing it][1].
16+
///
17+
/// [1]: LoanedMessage::publish
18+
pub struct LoanedMessage<'a, T>
19+
where
20+
T: RmwMessage,
21+
{
22+
pub(super) msg_ptr: *mut T,
23+
pub(super) publisher: &'a Publisher<T>,
24+
}
25+
26+
impl<'a, T> Deref for LoanedMessage<'a, T>
27+
where
28+
T: RmwMessage,
29+
{
30+
type Target = T;
31+
fn deref(&self) -> &Self::Target {
32+
// SAFETY: msg_ptr is a valid pointer, obtained through rcl_borrow_loaned_message.
33+
unsafe { &*self.msg_ptr }
34+
}
35+
}
36+
37+
impl<'a, T> DerefMut for LoanedMessage<'a, T>
38+
where
39+
T: RmwMessage,
40+
{
41+
fn deref_mut(&mut self) -> &mut Self::Target {
42+
// SAFETY: msg_ptr is a valid pointer, obtained through rcl_borrow_loaned_message.
43+
unsafe { &mut *self.msg_ptr }
44+
}
45+
}
46+
47+
impl<'a, T> Drop for LoanedMessage<'a, T>
48+
where
49+
T: RmwMessage,
50+
{
51+
fn drop(&mut self) {
52+
// Check whether the loan was already returned with
53+
// rcl_publish_loaned_message()
54+
if !self.msg_ptr.is_null() {
55+
unsafe {
56+
// SAFETY: These two pointers are valid, and the msg_ptr is not used afterwards.
57+
rcl_return_loaned_message_from_publisher(
58+
&*self.publisher.rcl_publisher_mtx.lock().unwrap(),
59+
self.msg_ptr as *mut _,
60+
)
61+
.ok()
62+
.unwrap()
63+
}
64+
}
65+
}
66+
}
67+
68+
impl<'a, T> LoanedMessage<'a, T>
69+
where
70+
T: RmwMessage,
71+
{
72+
/// Publishes the loaned message, falling back to regular publishing if needed.
73+
pub fn publish(mut self) -> Result<(), RclrsError> {
74+
unsafe {
75+
// SAFETY: These two pointers are valid, and the msg_ptr is not used afterwards.
76+
rcl_publish_loaned_message(
77+
&*self.publisher.rcl_publisher_mtx.lock().unwrap(),
78+
self.msg_ptr as *mut _,
79+
std::ptr::null_mut(),
80+
)
81+
.ok()?;
82+
}
83+
// Set the msg_ptr to null, as a signal to the drop impl that this
84+
// loan was already returned.
85+
self.msg_ptr = std::ptr::null_mut();
86+
Ok(())
87+
}
88+
}

rclrs/src/node/subscription.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ use std::sync::{Arc, Mutex, MutexGuard};
1212

1313
use rosidl_runtime_rs::{Message, RmwMessage};
1414

15+
mod readonly_loaned_message;
16+
pub use readonly_loaned_message::*;
17+
1518
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
1619
// they are running in. Therefore, this type can be safely sent to another thread.
1720
unsafe impl Send for rcl_subscription_t {}
@@ -187,6 +190,40 @@ where
187190
}
188191
}
189192

193+
impl<T> Subscription<T>
194+
where
195+
T: RmwMessage,
196+
{
197+
/// Obtains a read-only handle to a message owned by the middleware.
198+
///
199+
/// When there is no new message, this will return a
200+
/// [`SubscriptionTakeFailed`][1].
201+
///
202+
/// This is the counterpart to [`Publisher::borrow_loaned_message()`][2]. See its documentation
203+
/// for more information.
204+
///
205+
/// [1]: crate::RclrsError
206+
/// [2]: crate::Publisher::borrow_loaned_message
207+
pub fn take_loaned_message(&self) -> Result<ReadOnlyLoanedMessage<'_, T>, RclrsError> {
208+
let mut msg_ptr = std::ptr::null_mut();
209+
unsafe {
210+
// SAFETY: The third argument (message_info) and fourth argument (allocation) may be null.
211+
// The second argument (loaned_message) contains a null ptr as expected.
212+
rcl_take_loaned_message(
213+
&*self.handle.lock(),
214+
&mut msg_ptr,
215+
std::ptr::null_mut(),
216+
std::ptr::null_mut(),
217+
)
218+
.ok()?;
219+
}
220+
Ok(ReadOnlyLoanedMessage {
221+
msg_ptr: msg_ptr as *const T,
222+
subscription: self,
223+
})
224+
}
225+
}
226+
190227
impl<T> SubscriptionBase for Subscription<T>
191228
where
192229
T: Message,
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use crate::rcl_bindings::*;
2+
use crate::{Subscription, ToResult};
3+
4+
use rosidl_runtime_rs::RmwMessage;
5+
6+
use std::ops::Deref;
7+
8+
/// A message that is owned by the middleware, loaned out for reading.
9+
///
10+
/// It dereferences to a `&T`.
11+
///
12+
/// This type is returned by [`Subscription::take_loaned_message()`].
13+
///
14+
/// The loan is returned by dropping the `ReadOnlyLoanedMessage`.
15+
pub struct ReadOnlyLoanedMessage<'a, T>
16+
where
17+
T: RmwMessage,
18+
{
19+
pub(super) msg_ptr: *const T,
20+
pub(super) subscription: &'a Subscription<T>,
21+
}
22+
23+
impl<'a, T> Deref for ReadOnlyLoanedMessage<'a, T>
24+
where
25+
T: RmwMessage,
26+
{
27+
type Target = T;
28+
fn deref(&self) -> &Self::Target {
29+
unsafe { &*self.msg_ptr }
30+
}
31+
}
32+
33+
impl<'a, T> Drop for ReadOnlyLoanedMessage<'a, T>
34+
where
35+
T: RmwMessage,
36+
{
37+
fn drop(&mut self) {
38+
unsafe {
39+
rcl_return_loaned_message_from_subscription(
40+
&*self.subscription.handle.lock(),
41+
self.msg_ptr as *mut _,
42+
)
43+
.ok()
44+
.unwrap();
45+
}
46+
}
47+
}

rosidl_runtime_rs/src/traits.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ pub trait SequenceAlloc: Sized {
3737
/// used by user code.
3838
///
3939
/// User code never needs to call this trait's method, much less implement this trait.
40-
pub trait RmwMessage: Clone + Debug + Default + Send + Sync {
40+
pub trait RmwMessage: Clone + Debug + Default + Send + Sync + Message {
4141
/// Get a pointer to the correct `rosidl_message_type_support_t` structure.
4242
fn get_type_support() -> libc::uintptr_t;
4343
}

0 commit comments

Comments
 (0)