Skip to content

Commit d62583d

Browse files
authored
Async Workers (#446)
* Squash async worker changes Signed-off-by: Michael X. Grey <greyxmike@gmail.com> Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai> * Fix bug in logging macros Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai> * Clean up examples using new API Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai> * Ensure workers wake up when a task is run Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai> * Iterating on documentation Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai> * Update docs for subscriptions Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai> * Fix create_worker bug Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai> * Add more documentation for Worker Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai> * Fix all todos that can be addressed for now Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai> * Add introductory documentation and a parameter demo Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai> * Add documentation for parameters Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai> * Fix formatting Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai> * Remove residual directory Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai> * Port over packaging fix Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai> * Fix formatting for examples Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai> * Fix distro divergence in guid type Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai> * Fix compilation bug in 1.75 Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai> * Force backtrace version that is compatible with 1.75 Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai> --------- Signed-off-by: Michael X. Grey <greyxmike@gmail.com> Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai>
1 parent 205b84b commit d62583d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+6680
-2405
lines changed

examples/logging_demo/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
[package]
2+
name = "logging_demo"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
rclrs = "0.4"
8+
example_interfaces = "*"

examples/logging_demo/package.xml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?xml version="1.0"?>
2+
<?xml-model
3+
href="http://download.ros.org/schema/package_format3.xsd"
4+
schematypens="http://www.w3.org/2001/XMLSchema"?>
5+
<package format="3">
6+
<name>examples_logging_demo</name>
7+
<maintainer email="esteve@apache.org">Esteve Fernandez</maintainer>
8+
<!-- This project is not military-sponsored, Jacob's employment contract just requires him to use this email address -->
9+
<maintainer email="jacob.a.hassold.civ@army.mil">Jacob Hassold</maintainer>
10+
<version>0.4.1</version>
11+
<description>Package containing an example of how to use a worker in rclrs.</description>
12+
<license>Apache License 2.0</license>
13+
14+
<depend>rclrs</depend>
15+
<depend>rosidl_runtime_rs</depend>
16+
<depend>example_interfaces</depend>
17+
18+
<export>
19+
<build_type>ament_cargo</build_type>
20+
</export>
21+
</package>

examples/logging_demo/src/main.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use rclrs::*;
2+
use std::time::Duration;
3+
4+
fn main() -> Result<(), RclrsError> {
5+
let mut executor = Context::default_from_env()?.create_basic_executor();
6+
let node = executor.create_node("logging_demo")?;
7+
8+
let _subscription = node.clone().create_subscription(
9+
"logging_demo",
10+
move |msg: example_interfaces::msg::String| {
11+
let data = msg.data;
12+
13+
// You can apply modifiers such as .once() to node.logger()
14+
// to dictate how the logging behaves.
15+
log!(node.logger().once(), "First message: {data}",);
16+
17+
log!(node.logger().skip_first(), "Subsequent message: {data}",);
18+
19+
// You can chain multiple modifiers together.
20+
log_warn!(
21+
node.logger().skip_first().throttle(Duration::from_secs(5)),
22+
"Throttled message: {data}",
23+
);
24+
},
25+
)?;
26+
27+
// Any &str can be used as the logger name and have
28+
// logging modifiers applied to it.
29+
log_info!(
30+
"notice".once(),
31+
"Ready to begin logging example_interfaces/msg/String messages published to 'logging_demo'.",
32+
);
33+
log_warn!(
34+
"help",
35+
"Try running\n \
36+
$ ros2 topic pub logging_demo example_interfaces/msg/String \"data: message\"",
37+
);
38+
executor.spin(SpinOptions::default()).first_error()
39+
}
Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,31 @@
11
use anyhow::{Error, Result};
2+
use example_interfaces::srv::*;
23
use rclrs::*;
34

45
fn main() -> Result<(), Error> {
56
let mut executor = Context::default_from_env()?.create_basic_executor();
67

78
let node = executor.create_node("minimal_client")?;
89

9-
let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;
10+
let client = node.create_client::<AddTwoInts>("add_two_ints")?;
1011

11-
let request = example_interfaces::srv::AddTwoInts_Request { a: 41, b: 1 };
12+
let promise = executor.commands().run(async move {
13+
println!("Waiting for service...");
14+
client.notify_on_service_ready().await.unwrap();
1215

13-
println!("Starting client");
16+
let request = AddTwoInts_Request { a: 41, b: 1 };
1417

15-
while !client.service_is_ready()? {
16-
std::thread::sleep(std::time::Duration::from_millis(10));
17-
}
18+
println!("Waiting for response");
19+
let response: AddTwoInts_Response = client.call(&request).unwrap().await.unwrap();
1820

19-
client.async_send_request_with_callback(
20-
&request,
21-
move |response: example_interfaces::srv::AddTwoInts_Response| {
22-
println!(
23-
"Result of {} + {} is: {}",
24-
request.a, request.b, response.sum
25-
);
26-
},
27-
)?;
21+
println!(
22+
"Result of {} + {} is: {}",
23+
request.a, request.b, response.sum,
24+
);
25+
});
2826

29-
std::thread::sleep(std::time::Duration::from_millis(500));
30-
31-
println!("Waiting for response");
3227
executor
33-
.spin(SpinOptions::default())
34-
.first_error()
35-
.map_err(|err| err.into())
28+
.spin(SpinOptions::new().until_promise_resolved(promise))
29+
.first_error()?;
30+
Ok(())
3631
}
Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use anyhow::{Error, Result};
2+
use example_interfaces::srv::*;
23
use rclrs::*;
34

45
#[tokio::main]
@@ -7,28 +8,28 @@ async fn main() -> Result<(), Error> {
78

89
let node = executor.create_node("minimal_client")?;
910

10-
let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;
11+
let client = node.create_client::<AddTwoInts>("add_two_ints")?;
1112

1213
println!("Starting client");
1314

1415
while !client.service_is_ready()? {
1516
std::thread::sleep(std::time::Duration::from_millis(10));
1617
}
1718

18-
let request = example_interfaces::srv::AddTwoInts_Request { a: 41, b: 1 };
19+
let request = AddTwoInts_Request { a: 41, b: 1 };
1920

20-
let future = client.call_async(&request);
21+
let promise = client
22+
.call_then(&request, move |response: AddTwoInts_Response| {
23+
println!(
24+
"Result of {} + {} is: {}",
25+
request.a, request.b, response.sum,
26+
);
27+
})
28+
.unwrap();
2129

2230
println!("Waiting for response");
23-
24-
let rclrs_spin = tokio::task::spawn_blocking(move || executor.spin(SpinOptions::default()));
25-
26-
let response = future.await?;
27-
println!(
28-
"Result of {} + {} is: {}",
29-
request.a, request.b, response.sum
30-
);
31-
32-
rclrs_spin.await.ok();
31+
executor
32+
.spin(SpinOptions::new().until_promise_resolved(promise))
33+
.first_error()?;
3334
Ok(())
3435
}
Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
use anyhow::{Error, Result};
2+
use example_interfaces::srv::*;
23
use rclrs::*;
34

4-
fn handle_service(
5-
_request_header: &rclrs::rmw_request_id_t,
6-
request: example_interfaces::srv::AddTwoInts_Request,
7-
) -> example_interfaces::srv::AddTwoInts_Response {
8-
println!("request: {} + {}", request.a, request.b);
9-
example_interfaces::srv::AddTwoInts_Response {
5+
fn handle_service(request: AddTwoInts_Request, info: ServiceInfo) -> AddTwoInts_Response {
6+
let timestamp = info
7+
.received_timestamp
8+
.map(|t| format!(" at [{t:?}]"))
9+
.unwrap_or(String::new());
10+
11+
println!("request{timestamp}: {} + {}", request.a, request.b);
12+
AddTwoInts_Response {
1013
sum: request.a + request.b,
1114
}
1215
}
@@ -16,12 +19,9 @@ fn main() -> Result<(), Error> {
1619

1720
let node = executor.create_node("minimal_service")?;
1821

19-
let _server = node
20-
.create_service::<example_interfaces::srv::AddTwoInts, _>("add_two_ints", handle_service)?;
22+
let _server = node.create_service::<AddTwoInts, _>("add_two_ints", handle_service)?;
2123

2224
println!("Starting server");
23-
executor
24-
.spin(SpinOptions::default())
25-
.first_error()
26-
.map_err(|err| err.into())
25+
executor.spin(SpinOptions::default()).first_error()?;
26+
Ok(())
2727
}

examples/minimal_pub_sub/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ path = "src/zero_copy_publisher.rs"
2929
anyhow = {version = "1", features = ["backtrace"]}
3030
rclrs = "0.4"
3131
rosidl_runtime_rs = "0.4"
32-
std_msgs = "*"
32+
example_interfaces = "*"
3333

3434
# This specific version is compatible with Rust 1.75
3535
backtrace = "=0.3.74"

examples/minimal_pub_sub/package.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414

1515
<build_depend>rclrs</build_depend>
1616
<build_depend>rosidl_runtime_rs</build_depend>
17-
<build_depend>std_msgs</build_depend>
17+
<build_depend>example_interfaces</build_depend>
1818

1919
<exec_depend>rclrs</exec_depend>
2020
<exec_depend>rosidl_runtime_rs</exec_depend>
21-
<exec_depend>std_msgs</exec_depend>
21+
<exec_depend>example_interfaces</exec_depend>
2222

2323
<export>
2424
<build_type>ament_cargo</build_type>

examples/minimal_pub_sub/src/minimal_publisher.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ fn main() -> Result<(), Error> {
77

88
let node = executor.create_node("minimal_publisher")?;
99

10-
let publisher = node.create_publisher::<std_msgs::msg::String>("topic")?;
10+
let publisher = node.create_publisher::<example_interfaces::msg::String>("topic")?;
1111

12-
let mut message = std_msgs::msg::String::default();
12+
let mut message = example_interfaces::msg::String::default();
1313

1414
let mut publish_count: u32 = 1;
1515

examples/minimal_pub_sub/src/minimal_subscriber.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,16 @@ fn main() -> Result<(), Error> {
77

88
let node = executor.create_node("minimal_subscriber")?;
99

10-
let mut num_messages: usize = 0;
11-
12-
let _subscription = node.create_subscription::<std_msgs::msg::String, _>(
10+
let worker = node.create_worker::<usize>(0);
11+
let _subscription = worker.create_subscription::<example_interfaces::msg::String, _>(
1312
"topic",
14-
move |msg: std_msgs::msg::String| {
15-
num_messages += 1;
16-
println!("I heard: '{}'", msg.data);
17-
println!("(Got {} messages so far)", num_messages);
13+
move |num_messages: &mut usize, msg: example_interfaces::msg::String| {
14+
*num_messages += 1;
15+
println!("#{} | I heard: '{}'", *num_messages, msg.data);
1816
},
1917
)?;
2018

21-
executor
22-
.spin(SpinOptions::default())
23-
.first_error()
24-
.map_err(|err| err.into())
19+
println!("Waiting for messages...");
20+
executor.spin(SpinOptions::default()).first_error()?;
21+
Ok(())
2522
}
Lines changed: 36 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,41 @@
11
use rclrs::*;
2-
use std::sync::{
3-
atomic::{AtomicU32, Ordering},
4-
Arc, Mutex,
5-
};
2+
use std::sync::Arc;
63

74
use anyhow::{Error, Result};
85

9-
struct MinimalSubscriber {
10-
num_messages: AtomicU32,
6+
struct MinimalSubscriberNode {
7+
#[allow(unused)]
8+
subscription: WorkerSubscription<example_interfaces::msg::String, SubscriptionData>,
9+
}
10+
11+
struct SubscriptionData {
1112
node: Node,
12-
subscription: Mutex<Option<Subscription<std_msgs::msg::String>>>,
13+
num_messages: usize,
1314
}
1415

15-
impl MinimalSubscriber {
16-
pub fn new(executor: &Executor, name: &str, topic: &str) -> Result<Arc<Self>, RclrsError> {
16+
impl MinimalSubscriberNode {
17+
pub fn new(executor: &Executor, name: &str, topic: &str) -> Result<Self, RclrsError> {
1718
let node = executor.create_node(name)?;
18-
let minimal_subscriber = Arc::new(MinimalSubscriber {
19-
num_messages: 0.into(),
20-
node,
21-
subscription: None.into(),
19+
20+
let worker = node.create_worker::<SubscriptionData>(SubscriptionData {
21+
node: Arc::clone(&node),
22+
num_messages: 0,
2223
});
2324

24-
let minimal_subscriber_aux = Arc::clone(&minimal_subscriber);
25-
let subscription = minimal_subscriber
26-
.node
27-
.create_subscription::<std_msgs::msg::String, _>(
28-
topic,
29-
move |msg: std_msgs::msg::String| {
30-
minimal_subscriber_aux.callback(msg);
31-
},
32-
)?;
33-
*minimal_subscriber.subscription.lock().unwrap() = Some(subscription);
34-
Ok(minimal_subscriber)
35-
}
25+
let subscription = worker.create_subscription(
26+
topic,
27+
|data: &mut SubscriptionData, msg: example_interfaces::msg::String| {
28+
data.num_messages += 1;
29+
println!("[{}] I heard: '{}'", data.node.name(), msg.data);
30+
println!(
31+
"[{}] (Got {} messages so far)",
32+
data.node.name(),
33+
data.num_messages,
34+
);
35+
},
36+
)?;
3637

37-
fn callback(&self, msg: std_msgs::msg::String) {
38-
self.num_messages.fetch_add(1, Ordering::SeqCst);
39-
println!("[{}] I heard: '{}'", self.node.name(), msg.data);
40-
println!(
41-
"[{}] (Got {} messages so far)",
42-
self.node.name(),
43-
self.num_messages.load(Ordering::SeqCst)
44-
);
38+
Ok(MinimalSubscriberNode { subscription })
4539
}
4640
}
4741

@@ -50,14 +44,16 @@ fn main() -> Result<(), Error> {
5044
let publisher_node = executor.create_node("minimal_publisher")?;
5145

5246
let _subscriber_node_one =
53-
MinimalSubscriber::new(&executor, "minimal_subscriber_one", "topic")?;
47+
MinimalSubscriberNode::new(&executor, "minimal_subscriber_one", "topic")?;
5448
let _subscriber_node_two =
55-
MinimalSubscriber::new(&executor, "minimal_subscriber_two", "topic")?;
49+
MinimalSubscriberNode::new(&executor, "minimal_subscriber_two", "topic")?;
5650

57-
let publisher = publisher_node.create_publisher::<std_msgs::msg::String>("topic")?;
51+
let publisher = publisher_node.create_publisher::<example_interfaces::msg::String>("topic")?;
5852

59-
std::thread::spawn(move || -> Result<(), RclrsError> {
60-
let mut message = std_msgs::msg::String::default();
53+
// TODO(@mxgrey): Replace this with a timer once we have the Timer feature
54+
// merged in.
55+
std::thread::spawn(move || -> Result<(), rclrs::RclrsError> {
56+
let mut message = example_interfaces::msg::String::default();
6157
let mut publish_count: u32 = 1;
6258
loop {
6359
message.data = format!("Hello, world! {}", publish_count);
@@ -68,8 +64,6 @@ fn main() -> Result<(), Error> {
6864
}
6965
});
7066

71-
executor
72-
.spin(SpinOptions::default())
73-
.first_error()
74-
.map_err(|err| err.into())
67+
executor.spin(rclrs::SpinOptions::default()).first_error()?;
68+
Ok(())
7569
}

examples/minimal_pub_sub/src/zero_copy_publisher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ fn main() -> Result<(), Error> {
77

88
let node = executor.create_node("minimal_publisher")?;
99

10-
let publisher = node.create_publisher::<std_msgs::msg::rmw::UInt32>("topic")?;
10+
let publisher = node.create_publisher::<example_interfaces::msg::rmw::UInt32>("topic")?;
1111

1212
let mut publish_count: u32 = 1;
1313

0 commit comments

Comments
 (0)