Skip to content

Commit cf7bd77

Browse files
committed
watched_tasks: spawn long running tasks via special task builder
The WatchedTasksBuilder makes sure that the program ends if any of the tasks fails and also handles error propagation from the tasks to the main() function. Signed-off-by: Leonard Göhrs <l.goehrs@pengutronix.de>
1 parent a807941 commit cf7bd77

33 files changed

+577
-207
lines changed

src/adc.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ use std::time::Duration;
1919

2020
use anyhow::Result;
2121
use async_std::sync::Arc;
22-
use async_std::task::{sleep, spawn};
22+
use async_std::task::sleep;
2323

2424
use crate::broker::{BrokerBuilder, Topic};
2525
use crate::measurement::{Measurement, Timestamp};
26+
use crate::watched_tasks::WatchedTasksBuilder;
2627

2728
const HISTORY_LENGTH: usize = 200;
2829
const SLOW_INTERVAL: Duration = Duration::from_millis(100);
@@ -77,7 +78,7 @@ pub struct Adc {
7778
}
7879

7980
impl Adc {
80-
pub async fn new(bb: &mut BrokerBuilder) -> Result<Self> {
81+
pub async fn new(bb: &mut BrokerBuilder, wtb: &mut WatchedTasksBuilder) -> Result<Self> {
8182
let stm32_thread = IioThread::new_stm32().await?;
8283
let powerboard_thread = IioThread::new_powerboard().await?;
8384

@@ -212,7 +213,7 @@ impl Adc {
212213

213214
// Spawn an async task to transfer values from the Atomic value based
214215
// "fast" interface to the broker based "slow" interface.
215-
spawn(async move {
216+
wtb.spawn_task("adc-update", async move {
216217
loop {
217218
sleep(SLOW_INTERVAL).await;
218219

src/backlight.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
use anyhow::Result;
1919
use async_std::prelude::*;
2020
use async_std::sync::Arc;
21-
use async_std::task::spawn;
2221
use log::warn;
2322

2423
mod demo_mode;
@@ -30,21 +29,22 @@ use demo_mode::{Backlight as SysBacklight, Brightness, SysClass};
3029
use sysfs_class::{Backlight as SysBacklight, Brightness, SysClass};
3130

3231
use crate::broker::{BrokerBuilder, Topic};
32+
use crate::watched_tasks::WatchedTasksBuilder;
3333

3434
pub struct Backlight {
3535
pub brightness: Arc<Topic<f32>>,
3636
}
3737

3838
impl Backlight {
39-
pub fn new(bb: &mut BrokerBuilder) -> Result<Self> {
39+
pub fn new(bb: &mut BrokerBuilder, wtb: &mut WatchedTasksBuilder) -> Result<Self> {
4040
let brightness = bb.topic_rw("/v1/tac/display/backlight/brightness", Some(1.0));
4141

4242
let (mut rx, _) = brightness.clone().subscribe_unbounded();
4343

4444
let backlight = SysBacklight::new("backlight")?;
4545
let max_brightness = backlight.max_brightness()?;
4646

47-
spawn(async move {
47+
wtb.spawn_task("backlight-dimmer", async move {
4848
while let Some(fraction) = rx.next().await {
4949
let brightness = (max_brightness as f32) * fraction;
5050
let mut brightness = brightness.clamp(0.0, max_brightness as f32) as u64;
@@ -60,6 +60,8 @@ impl Backlight {
6060
warn!("Failed to set LED pattern: {}", e);
6161
}
6262
}
63+
64+
Ok(())
6365
});
6466

6567
Ok(Self { brightness })

src/broker.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
use async_std::sync::Arc;
1919
use serde::{de::DeserializeOwned, Serialize};
2020

21+
use crate::watched_tasks::WatchedTasksBuilder;
22+
2123
mod mqtt_conn;
2224
mod persistence;
2325
mod rest;
@@ -113,10 +115,10 @@ impl BrokerBuilder {
113115
/// Finish building the broker
114116
///
115117
/// This consumes the builder so that no new topics can be registered
116-
pub fn build(self, server: &mut tide::Server<()>) {
118+
pub fn build(self, wtb: &mut WatchedTasksBuilder, server: &mut tide::Server<()>) {
117119
let topics = Arc::new(self.topics);
118120

119-
persistence::register(topics.clone());
121+
persistence::register(wtb, topics.clone());
120122
rest::register(server, topics.clone());
121123
mqtt_conn::register(server, topics);
122124
}

src/broker/persistence.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@ use anyhow::{bail, Result};
2222
use async_std::channel::{unbounded, Receiver};
2323
use async_std::prelude::*;
2424
use async_std::sync::Arc;
25-
use async_std::task::spawn;
2625
use log::{error, info};
2726
use serde::{Deserialize, Serialize};
2827
use serde_json::{from_reader, to_writer_pretty, Map, Value};
2928

3029
use super::{AnyTopic, TopicName};
3130

31+
use crate::watched_tasks::WatchedTasksBuilder;
32+
3233
#[cfg(feature = "demo_mode")]
3334
const PERSISTENCE_PATH: &str = "demo_files/srv/tacd/state.json";
3435

@@ -147,7 +148,7 @@ async fn save_on_change(
147148
Ok(())
148149
}
149150

150-
pub fn register(topics: Arc<Vec<Arc<dyn AnyTopic>>>) {
151+
pub fn register(wtb: &mut WatchedTasksBuilder, topics: Arc<Vec<Arc<dyn AnyTopic>>>) {
151152
load(&topics).unwrap();
152153

153154
let (tx, rx) = unbounded();
@@ -156,5 +157,5 @@ pub fn register(topics: Arc<Vec<Arc<dyn AnyTopic>>>) {
156157
topic.subscribe_as_bytes(tx.clone(), false);
157158
}
158159

159-
spawn(async move { save_on_change(topics, rx).await.unwrap() });
160+
wtb.spawn_task("persistence-save", save_on_change(topics, rx));
160161
}

src/dbus.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use async_std::sync::Arc;
1919

2020
use crate::broker::{BrokerBuilder, Topic};
2121
use crate::led::BlinkPattern;
22+
use crate::watched_tasks::WatchedTasksBuilder;
2223

2324
#[cfg(feature = "demo_mode")]
2425
mod zb {
@@ -78,6 +79,7 @@ pub struct DbusSession {
7879
impl DbusSession {
7980
pub async fn new(
8081
bb: &mut BrokerBuilder,
82+
wtb: &mut WatchedTasksBuilder,
8183
led_dut: Arc<Topic<BlinkPattern>>,
8284
led_uplink: Arc<Topic<BlinkPattern>>,
8385
) -> Self {
@@ -91,10 +93,10 @@ impl DbusSession {
9193
let conn = Arc::new(tacd.serve(conn_builder).build().await.unwrap());
9294

9395
Self {
94-
hostname: Hostname::new(bb, &conn),
95-
network: Network::new(bb, &conn, led_dut, led_uplink),
96-
rauc: Rauc::new(bb, &conn),
97-
systemd: Systemd::new(bb, &conn).await,
96+
hostname: Hostname::new(bb, wtb, &conn),
97+
network: Network::new(bb, wtb, &conn, led_dut, led_uplink),
98+
rauc: Rauc::new(bb, wtb, &conn),
99+
systemd: Systemd::new(bb, wtb, &conn).await,
98100
}
99101
}
100102
}

src/dbus/hostname.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ use async_std::stream::StreamExt;
2121
#[cfg(not(feature = "demo_mode"))]
2222
use zbus::Connection;
2323

24-
use crate::broker::{BrokerBuilder, Topic};
2524
use async_std::sync::Arc;
2625

26+
use crate::broker::{BrokerBuilder, Topic};
27+
use crate::watched_tasks::WatchedTasksBuilder;
28+
2729
mod hostnamed;
2830

2931
pub struct Hostname {
@@ -32,19 +34,24 @@ pub struct Hostname {
3234

3335
impl Hostname {
3436
#[cfg(feature = "demo_mode")]
35-
pub fn new<C>(bb: &mut BrokerBuilder, _conn: C) -> Self {
37+
pub fn new<C>(bb: &mut BrokerBuilder, _wtb: &mut WatchedTasksBuilder, _conn: C) -> Self {
3638
Self {
3739
hostname: bb.topic_ro("/v1/tac/network/hostname", Some("lxatac".into())),
3840
}
3941
}
4042

4143
#[cfg(not(feature = "demo_mode"))]
42-
pub fn new(bb: &mut BrokerBuilder, conn: &Arc<Connection>) -> Self {
44+
pub fn new(
45+
bb: &mut BrokerBuilder,
46+
wtb: &mut WatchedTasksBuilder,
47+
conn: &Arc<Connection>,
48+
) -> Self {
4349
let hostname = bb.topic_ro("/v1/tac/network/hostname", None);
4450

4551
let conn = conn.clone();
4652
let hostname_topic = hostname.clone();
47-
async_std::task::spawn(async move {
53+
54+
wtb.spawn_task("hostname-update", async move {
4855
let proxy = hostnamed::HostnameProxy::new(&conn).await.unwrap();
4956

5057
let mut stream = proxy.receive_hostname_changed().await;
@@ -58,6 +65,8 @@ impl Hostname {
5865
hostname_topic.set(h);
5966
}
6067
}
68+
69+
Ok(())
6170
});
6271

6372
Self { hostname }

src/dbus/networkmanager.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize};
2222

2323
use crate::broker::{BrokerBuilder, Topic};
2424
use crate::led::BlinkPattern;
25+
use crate::watched_tasks::WatchedTasksBuilder;
2526

2627
// Macro use makes these modules quite heavy, so we keep them commented
2728
// out until they are actually used
@@ -244,6 +245,7 @@ impl Network {
244245
#[cfg(feature = "demo_mode")]
245246
pub fn new<C>(
246247
bb: &mut BrokerBuilder,
248+
_wtb: &mut WatchedTasksBuilder,
247249
_conn: C,
248250
_led_dut: Arc<Topic<BlinkPattern>>,
249251
_led_uplink: Arc<Topic<BlinkPattern>>,
@@ -266,6 +268,7 @@ impl Network {
266268
#[cfg(not(feature = "demo_mode"))]
267269
pub fn new(
268270
bb: &mut BrokerBuilder,
271+
wtb: &mut WatchedTasksBuilder,
269272
conn: &Arc<Connection>,
270273
led_dut: Arc<Topic<BlinkPattern>>,
271274
led_uplink: Arc<Topic<BlinkPattern>>,
@@ -274,26 +277,20 @@ impl Network {
274277

275278
let conn_task = conn.clone();
276279
let dut_interface = this.dut_interface.clone();
277-
async_std::task::spawn(async move {
278-
handle_link_updates(&conn_task, dut_interface, "dut", led_dut)
279-
.await
280-
.unwrap();
280+
wtb.spawn_task("link-dut-update", async move {
281+
handle_link_updates(&conn_task, dut_interface, "dut", led_dut).await
281282
});
282283

283284
let conn_task = conn.clone();
284285
let uplink_interface = this.uplink_interface.clone();
285-
async_std::task::spawn(async move {
286-
handle_link_updates(&conn_task, uplink_interface, "uplink", led_uplink)
287-
.await
288-
.unwrap();
286+
wtb.spawn_task("link-uplink-update", async move {
287+
handle_link_updates(&conn_task, uplink_interface, "uplink", led_uplink).await
289288
});
290289

291290
let conn_task = conn.clone();
292291
let bridge_interface = this.bridge_interface.clone();
293-
async_std::task::spawn(async move {
294-
handle_ipv4_updates(&conn_task, bridge_interface, "tac-bridge")
295-
.await
296-
.unwrap();
292+
wtb.spawn_task("ip-tac-bridge-update", async move {
293+
handle_ipv4_updates(&conn_task, bridge_interface, "tac-bridge").await
297294
});
298295

299296
this

0 commit comments

Comments
 (0)