Skip to content

Commit cad6964

Browse files
Added support for Guard Conditions (#249)
* Added support for Guard Conditions
1 parent d71fda4 commit cad6964

File tree

6 files changed

+322
-4
lines changed

6 files changed

+322
-4
lines changed

rclrs/CHANGELOG.rst

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ Changelog for package rclrs
55
0.3
66
----------------
77
* Loaned messages (zero-copy) (`#212 <https://github.com/ros2-rust/ros2_rust/pull/212>`_)
8+
* Graph queries (`#234 <https://github.com/ros2-rust/ros2_rust/pull/234>`_)
9+
* Guard conditions (`#249 <https://github.com/ros2-rust/ros2_rust/pull/249>`_)
810

911
0.2 (2022-07-21)
1012
----------------
1113
* First release
1214
* Build based on `colcon-ros-cargo`
1315
* Message generation packages `rosidl_generator_rs` and `rosidl_runtime_rs`
1416
* Publisher, Subscription, Client and Service
15-
* Tunable QoS settings
17+
* Tunable QoS settings

rclrs/src/lib.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,14 @@ pub use rcl_bindings::rmw_request_id_t;
4949
pub fn spin_once(node: &Node, timeout: Option<Duration>) -> Result<(), RclrsError> {
5050
let live_subscriptions = node.live_subscriptions();
5151
let live_clients = node.live_clients();
52+
let live_guard_conditions = node.live_guard_conditions();
5253
let live_services = node.live_services();
5354
let ctx = Context {
5455
rcl_context_mtx: node.rcl_context_mtx.clone(),
5556
};
5657
let mut wait_set = WaitSet::new(
5758
live_subscriptions.len(),
58-
0,
59+
live_guard_conditions.len(),
5960
0,
6061
live_clients.len(),
6162
live_services.len(),
@@ -71,6 +72,10 @@ pub fn spin_once(node: &Node, timeout: Option<Duration>) -> Result<(), RclrsErro
7172
wait_set.add_client(live_client.clone())?;
7273
}
7374

75+
for live_guard_condition in &live_guard_conditions {
76+
wait_set.add_guard_condition(live_guard_condition.clone())?;
77+
}
78+
7479
for live_service in &live_services {
7580
wait_set.add_service(live_service.clone())?;
7681
}

rclrs/src/node.rs

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ pub use self::graph::*;
55

66
use crate::rcl_bindings::*;
77
use crate::{
8-
Client, ClientBase, Context, ParameterOverrideMap, Publisher, QoSProfile, RclrsError, Service,
9-
ServiceBase, Subscription, SubscriptionBase, SubscriptionCallback, ToResult,
8+
Client, ClientBase, Context, GuardCondition, ParameterOverrideMap, Publisher, QoSProfile,
9+
RclrsError, Service, ServiceBase, Subscription, SubscriptionBase, SubscriptionCallback,
10+
ToResult,
1011
};
1112

1213
use std::cmp::PartialEq;
@@ -68,6 +69,7 @@ pub struct Node {
6869
pub(crate) rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
6970
pub(crate) rcl_context_mtx: Arc<Mutex<rcl_context_t>>,
7071
pub(crate) clients: Vec<Weak<dyn ClientBase>>,
72+
pub(crate) guard_conditions: Vec<Weak<GuardCondition>>,
7173
pub(crate) services: Vec<Weak<dyn ServiceBase>>,
7274
pub(crate) subscriptions: Vec<Weak<dyn SubscriptionBase>>,
7375
_parameter_map: ParameterOverrideMap,
@@ -189,6 +191,47 @@ impl Node {
189191
Ok(client)
190192
}
191193

194+
/// Creates a [`GuardCondition`][1] with no callback.
195+
///
196+
/// A weak pointer to the `GuardCondition` is stored within this node.
197+
/// When this node is added to a wait set (e.g. when calling `spin_once`[2]
198+
/// with this node as an argument), the guard condition can be used to
199+
/// interrupt the wait.
200+
///
201+
/// [1]: crate::GuardCondition
202+
/// [2]: crate::spin_once
203+
pub fn create_guard_condition(&mut self) -> Arc<GuardCondition> {
204+
let guard_condition = Arc::new(GuardCondition::new_with_rcl_context(
205+
&mut self.rcl_context_mtx.lock().unwrap(),
206+
None,
207+
));
208+
self.guard_conditions
209+
.push(Arc::downgrade(&guard_condition) as Weak<GuardCondition>);
210+
guard_condition
211+
}
212+
213+
/// Creates a [`GuardCondition`][1] with a callback.
214+
///
215+
/// A weak pointer to the `GuardCondition` is stored within this node.
216+
/// When this node is added to a wait set (e.g. when calling `spin_once`[2]
217+
/// with this node as an argument), the guard condition can be used to
218+
/// interrupt the wait.
219+
///
220+
/// [1]: crate::GuardCondition
221+
/// [2]: crate::spin_once
222+
pub fn create_guard_condition_with_callback<F>(&mut self, callback: F) -> Arc<GuardCondition>
223+
where
224+
F: Fn() + Send + Sync + 'static,
225+
{
226+
let guard_condition = Arc::new(GuardCondition::new_with_rcl_context(
227+
&mut self.rcl_context_mtx.lock().unwrap(),
228+
Some(Box::new(callback) as Box<dyn Fn() + Send + Sync>),
229+
));
230+
self.guard_conditions
231+
.push(Arc::downgrade(&guard_condition) as Weak<GuardCondition>);
232+
guard_condition
233+
}
234+
192235
/// Creates a [`Publisher`][1].
193236
///
194237
/// [1]: crate::Publisher
@@ -254,6 +297,13 @@ impl Node {
254297
self.clients.iter().filter_map(Weak::upgrade).collect()
255298
}
256299

300+
pub(crate) fn live_guard_conditions(&self) -> Vec<Arc<GuardCondition>> {
301+
self.guard_conditions
302+
.iter()
303+
.filter_map(Weak::upgrade)
304+
.collect()
305+
}
306+
257307
pub(crate) fn live_services(&self) -> Vec<Arc<dyn ServiceBase>> {
258308
self.services.iter().filter_map(Weak::upgrade).collect()
259309
}

rclrs/src/node/builder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ impl NodeBuilder {
276276
rcl_node_mtx,
277277
rcl_context_mtx: self.context.clone(),
278278
clients: vec![],
279+
guard_conditions: vec![],
279280
services: vec![],
280281
subscriptions: vec![],
281282
_parameter_map,

rclrs/src/wait.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ use std::time::Duration;
2424
use std::vec::Vec;
2525

2626
mod exclusivity_guard;
27+
mod guard_condition;
2728
use exclusivity_guard::*;
29+
pub use guard_condition::*;
2830

2931
/// A struct for waiting on subscriptions and other waitable entities to become ready.
3032
pub struct WaitSet {
@@ -36,6 +38,8 @@ pub struct WaitSet {
3638
// even in the error case.
3739
subscriptions: Vec<ExclusivityGuard<Arc<dyn SubscriptionBase>>>,
3840
clients: Vec<ExclusivityGuard<Arc<dyn ClientBase>>>,
41+
// The guard conditions that are currently registered in the wait set.
42+
guard_conditions: Vec<ExclusivityGuard<Arc<GuardCondition>>>,
3943
services: Vec<ExclusivityGuard<Arc<dyn ServiceBase>>>,
4044
}
4145

@@ -45,6 +49,8 @@ pub struct ReadyEntities {
4549
pub subscriptions: Vec<Arc<dyn SubscriptionBase>>,
4650
/// A list of clients that have potentially received responses.
4751
pub clients: Vec<Arc<dyn ClientBase>>,
52+
/// A list of guard conditions that have been triggered.
53+
pub guard_conditions: Vec<Arc<GuardCondition>>,
4854
/// A list of services that have potentially received requests.
4955
pub services: Vec<Arc<dyn ServiceBase>>,
5056
}
@@ -105,6 +111,7 @@ impl WaitSet {
105111
rcl_wait_set,
106112
_rcl_context_mtx: context.rcl_context_mtx.clone(),
107113
subscriptions: Vec::new(),
114+
guard_conditions: Vec::new(),
108115
clients: Vec::new(),
109116
services: Vec::new(),
110117
})
@@ -116,6 +123,7 @@ impl WaitSet {
116123
/// [`WaitSet::new`].
117124
pub fn clear(&mut self) {
118125
self.subscriptions.clear();
126+
self.guard_conditions.clear();
119127
self.clients.clear();
120128
self.services.clear();
121129
// This cannot fail – the rcl_wait_set_clear function only checks that the input handle is
@@ -159,6 +167,38 @@ impl WaitSet {
159167
Ok(())
160168
}
161169

170+
/// Adds a guard condition to the wait set.
171+
///
172+
/// # Errors
173+
/// - If the guard condition was already added to this wait set or another one,
174+
/// [`AlreadyAddedToWaitSet`][1] will be returned
175+
/// - If the number of guard conditions in the wait set is larger than the
176+
/// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
177+
///
178+
/// [1]: crate::RclrsError
179+
/// [2]: crate::RclReturnCode
180+
pub fn add_guard_condition(
181+
&mut self,
182+
guard_condition: Arc<GuardCondition>,
183+
) -> Result<(), RclrsError> {
184+
let exclusive_guard_condition = ExclusivityGuard::new(
185+
Arc::clone(&guard_condition),
186+
Arc::clone(&guard_condition.in_use_by_wait_set),
187+
)?;
188+
189+
unsafe {
190+
// SAFETY: Safe if the wait set and guard condition are initialized
191+
rcl_wait_set_add_guard_condition(
192+
&mut self.rcl_wait_set,
193+
&*guard_condition.rcl_guard_condition.lock().unwrap(),
194+
std::ptr::null_mut(),
195+
)
196+
.ok()?;
197+
}
198+
self.guard_conditions.push(exclusive_guard_condition);
199+
Ok(())
200+
}
201+
162202
/// Adds a client to the wait set.
163203
///
164204
/// # Errors
@@ -262,6 +302,7 @@ impl WaitSet {
262302
let mut ready_entities = ReadyEntities {
263303
subscriptions: Vec::new(),
264304
clients: Vec::new(),
305+
guard_conditions: Vec::new(),
265306
services: Vec::new(),
266307
};
267308
for (i, subscription) in self.subscriptions.iter().enumerate() {
@@ -275,6 +316,7 @@ impl WaitSet {
275316
.push(Arc::clone(&subscription.waitable));
276317
}
277318
}
319+
278320
for (i, client) in self.clients.iter().enumerate() {
279321
// SAFETY: The `clients` entry is an array of pointers, and this dereferencing is
280322
// equivalent to
@@ -284,6 +326,19 @@ impl WaitSet {
284326
ready_entities.clients.push(Arc::clone(&client.waitable));
285327
}
286328
}
329+
330+
for (i, guard_condition) in self.guard_conditions.iter().enumerate() {
331+
// SAFETY: The `clients` entry is an array of pointers, and this dereferencing is
332+
// equivalent to
333+
// https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
334+
let wait_set_entry = unsafe { *self.rcl_wait_set.guard_conditions.add(i) };
335+
if !wait_set_entry.is_null() {
336+
ready_entities
337+
.guard_conditions
338+
.push(Arc::clone(&guard_condition.waitable));
339+
}
340+
}
341+
287342
for (i, service) in self.services.iter().enumerate() {
288343
// SAFETY: The `services` entry is an array of pointers, and this dereferencing is
289344
// equivalent to
@@ -309,4 +364,20 @@ mod tests {
309364
assert_send::<WaitSet>();
310365
assert_sync::<WaitSet>();
311366
}
367+
368+
#[test]
369+
fn guard_condition_in_wait_set_readies() -> Result<(), RclrsError> {
370+
let context = Context::new([])?;
371+
372+
let guard_condition = Arc::new(GuardCondition::new(&context));
373+
374+
let mut wait_set = WaitSet::new(0, 1, 0, 0, 0, 0, &context)?;
375+
wait_set.add_guard_condition(Arc::clone(&guard_condition))?;
376+
guard_condition.trigger()?;
377+
378+
let readies = wait_set.wait(Some(std::time::Duration::from_millis(10)))?;
379+
assert!(readies.guard_conditions.contains(&guard_condition));
380+
381+
Ok(())
382+
}
312383
}

0 commit comments

Comments
 (0)