Skip to content

Commit d10a008

Browse files
committed
watched_tasks: spawn long running tasks via special task builder
The WatchedTasksBuilder makes sure that the program ends if any of the tasks fails and also handles error propagation from the tasks to the main() function. Signed-off-by: Leonard Göhrs <l.goehrs@pengutronix.de>
1 parent 3cb20ac commit d10a008

30 files changed

+523
-178
lines changed

src/adc.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ use std::time::Duration;
1919

2020
use anyhow::Result;
2121
use async_std::sync::Arc;
22-
use async_std::task::{sleep, spawn};
22+
use async_std::task::sleep;
2323

2424
use crate::broker::{BrokerBuilder, Topic};
2525
use crate::measurement::{Measurement, Timestamp};
26+
use crate::watched_tasks::WatchedTasksBuilder;
2627

2728
const HISTORY_LENGTH: usize = 200;
2829
const SLOW_INTERVAL: Duration = Duration::from_millis(100);
@@ -77,7 +78,7 @@ pub struct Adc {
7778
}
7879

7980
impl Adc {
80-
pub async fn new(bb: &mut BrokerBuilder) -> Result<Self> {
81+
pub async fn new(bb: &mut BrokerBuilder, wtb: &mut WatchedTasksBuilder) -> Result<Self> {
8182
let stm32_thread = IioThread::new_stm32().await?;
8283
let powerboard_thread = IioThread::new_powerboard().await?;
8384

@@ -212,7 +213,7 @@ impl Adc {
212213

213214
// Spawn an async task to transfer values from the Atomic value based
214215
// "fast" interface to the broker based "slow" interface.
215-
spawn(async move {
216+
wtb.spawn_task("adc-update", async move {
216217
loop {
217218
sleep(SLOW_INTERVAL).await;
218219

src/backlight.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
use anyhow::Result;
1919
use async_std::prelude::*;
2020
use async_std::sync::Arc;
21-
use async_std::task::spawn;
2221
use log::warn;
2322

2423
mod demo_mode;
@@ -30,21 +29,22 @@ use demo_mode::{Backlight as SysBacklight, Brightness, SysClass};
3029
use sysfs_class::{Backlight as SysBacklight, Brightness, SysClass};
3130

3231
use crate::broker::{BrokerBuilder, Topic};
32+
use crate::watched_tasks::WatchedTasksBuilder;
3333

3434
pub struct Backlight {
3535
pub brightness: Arc<Topic<f32>>,
3636
}
3737

3838
impl Backlight {
39-
pub fn new(bb: &mut BrokerBuilder) -> Result<Self> {
39+
pub fn new(bb: &mut BrokerBuilder, wtb: &mut WatchedTasksBuilder) -> Result<Self> {
4040
let brightness = bb.topic_rw("/v1/tac/display/backlight/brightness", Some(1.0));
4141

4242
let (mut rx, _) = brightness.clone().subscribe_unbounded();
4343

4444
let backlight = SysBacklight::new("backlight")?;
4545
let max_brightness = backlight.max_brightness()?;
4646

47-
spawn(async move {
47+
wtb.spawn_task("backlight-dimmer", async move {
4848
while let Some(fraction) = rx.next().await {
4949
let brightness = (max_brightness as f32) * fraction;
5050
let mut brightness = brightness.clamp(0.0, max_brightness as f32) as u64;
@@ -60,6 +60,8 @@ impl Backlight {
6060
warn!("Failed to set LED pattern: {}", e);
6161
}
6262
}
63+
64+
Ok(())
6365
});
6466

6567
Ok(Self { brightness })

src/broker.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
use async_std::sync::Arc;
1919
use serde::{de::DeserializeOwned, Serialize};
2020

21+
use crate::watched_tasks::WatchedTasksBuilder;
22+
2123
mod mqtt_conn;
2224
mod persistence;
2325
mod rest;
@@ -113,10 +115,10 @@ impl BrokerBuilder {
113115
/// Finish building the broker
114116
///
115117
/// This consumes the builder so that no new topics can be registered
116-
pub fn build(self, server: &mut tide::Server<()>) {
118+
pub fn build(self, wtb: &mut WatchedTasksBuilder, server: &mut tide::Server<()>) {
117119
let topics = Arc::new(self.topics);
118120

119-
persistence::register(topics.clone());
121+
persistence::register(wtb, topics.clone());
120122
rest::register(server, topics.clone());
121123
mqtt_conn::register(server, topics);
122124
}

src/broker/persistence.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@ use anyhow::{bail, Result};
2222
use async_std::channel::{unbounded, Receiver};
2323
use async_std::prelude::*;
2424
use async_std::sync::Arc;
25-
use async_std::task::spawn;
2625
use log::{error, info};
2726
use serde::{Deserialize, Serialize};
2827
use serde_json::{from_reader, to_writer_pretty, Map, Value};
2928

3029
use super::{AnyTopic, TopicName};
3130

31+
use crate::watched_tasks::WatchedTasksBuilder;
32+
3233
#[cfg(feature = "demo_mode")]
3334
const PERSISTENCE_PATH: &str = "demo_files/srv/tacd/state.json";
3435

@@ -147,7 +148,7 @@ async fn save_on_change(
147148
Ok(())
148149
}
149150

150-
pub fn register(topics: Arc<Vec<Arc<dyn AnyTopic>>>) {
151+
pub fn register(wtb: &mut WatchedTasksBuilder, topics: Arc<Vec<Arc<dyn AnyTopic>>>) {
151152
load(&topics).unwrap();
152153

153154
let (tx, rx) = unbounded();
@@ -156,5 +157,5 @@ pub fn register(topics: Arc<Vec<Arc<dyn AnyTopic>>>) {
156157
topic.subscribe_as_bytes(tx.clone(), false);
157158
}
158159

159-
spawn(async move { save_on_change(topics, rx).await.unwrap() });
160+
wtb.spawn_task("persistence-save", save_on_change(topics, rx));
160161
}

src/dbus.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use async_std::sync::Arc;
1919

2020
use crate::broker::{BrokerBuilder, Topic};
2121
use crate::led::BlinkPattern;
22+
use crate::watched_tasks::WatchedTasksBuilder;
2223

2324
#[cfg(feature = "demo_mode")]
2425
mod zb {
@@ -74,6 +75,7 @@ pub struct DbusSession {
7475
impl DbusSession {
7576
pub async fn new(
7677
bb: &mut BrokerBuilder,
78+
wtb: &mut WatchedTasksBuilder,
7779
led_dut: Arc<Topic<BlinkPattern>>,
7880
led_uplink: Arc<Topic<BlinkPattern>>,
7981
) -> Self {
@@ -87,9 +89,9 @@ impl DbusSession {
8789
let conn = Arc::new(tacd.serve(conn_builder).build().await.unwrap());
8890

8991
Self {
90-
network: Network::new(bb, &conn, led_dut, led_uplink),
91-
rauc: Rauc::new(bb, &conn),
92-
systemd: Systemd::new(bb, &conn).await,
92+
network: Network::new(bb, wtb, &conn, led_dut, led_uplink),
93+
rauc: Rauc::new(bb, wtb, &conn),
94+
systemd: Systemd::new(bb, wtb, &conn).await,
9395
}
9496
}
9597
}

src/dbus/networkmanager/mod.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize};
2222

2323
use crate::broker::{BrokerBuilder, Topic};
2424
use crate::led::BlinkPattern;
25+
use crate::watched_tasks::WatchedTasksBuilder;
2526

2627
mod devices;
2728
mod hostname;
@@ -256,6 +257,7 @@ impl Network {
256257
#[cfg(feature = "demo_mode")]
257258
pub fn new<C>(
258259
bb: &mut BrokerBuilder,
260+
_wtb: &mut WatchedTasksBuilder,
259261
_conn: C,
260262
_led_dut: Arc<Topic<BlinkPattern>>,
261263
_led_uplink: Arc<Topic<BlinkPattern>>,
@@ -279,6 +281,7 @@ impl Network {
279281
#[cfg(not(feature = "demo_mode"))]
280282
pub fn new(
281283
bb: &mut BrokerBuilder,
284+
wtb: &mut WatchedTasksBuilder,
282285
conn: &Arc<Connection>,
283286
led_dut: Arc<Topic<BlinkPattern>>,
284287
led_uplink: Arc<Topic<BlinkPattern>>,
@@ -288,7 +291,7 @@ impl Network {
288291
{
289292
let conn = conn.clone();
290293
let hostname_topic = this.hostname.clone();
291-
async_std::task::spawn(async move {
294+
wtb.spawn_task("hostname-update", async move {
292295
let proxy = hostname::HostnameProxy::new(&conn).await.unwrap();
293296

294297
let mut stream = proxy.receive_hostname_changed().await;
@@ -302,13 +305,15 @@ impl Network {
302305
hostname_topic.set(h);
303306
}
304307
}
308+
309+
Ok(())
305310
});
306311
}
307312

308313
{
309314
let conn = conn.clone();
310315
let dut_interface = this.dut_interface.clone();
311-
async_std::task::spawn(async move {
316+
wtb.spawn_task("link-dut-update", async move {
312317
let mut link_stream = loop {
313318
if let Ok(ls) = LinkStream::new(conn.clone(), "dut").await {
314319
break ls;
@@ -330,13 +335,15 @@ impl Network {
330335

331336
dut_interface.set(info);
332337
}
338+
339+
Ok(())
333340
});
334341
}
335342

336343
{
337344
let conn = conn.clone();
338345
let uplink_interface = this.uplink_interface.clone();
339-
async_std::task::spawn(async move {
346+
wtb.spawn_task("link-uplink-update", async move {
340347
let mut link_stream = loop {
341348
if let Ok(ls) = LinkStream::new(conn.clone(), "uplink").await {
342349
break ls;
@@ -355,13 +362,15 @@ impl Network {
355362

356363
uplink_interface.set(info);
357364
}
365+
366+
Ok(())
358367
});
359368
}
360369

361370
{
362371
let conn = conn.clone();
363372
let bridge_interface = this.bridge_interface.clone();
364-
async_std::task::spawn(async move {
373+
wtb.spawn_task("ip-tac-bridge-update", async move {
365374
let mut ip_stream = loop {
366375
if let Ok(ips) = IpStream::new(conn.clone(), "tac-bridge").await {
367376
break ips;
@@ -375,6 +384,8 @@ impl Network {
375384
while let Ok(info) = ip_stream.next(&conn).await {
376385
bridge_interface.set(info);
377386
}
387+
388+
Ok(())
378389
});
379390
}
380391

0 commit comments

Comments
 (0)