Skip to content

Commit e94c996

Browse files
authored
Merge pull request #47 from hnez/watched-tasks
watched_tasks: maintain a list of spawned async tasks and propagate errors
2 parents a807941 + 396136e commit e94c996

39 files changed

+1214
-597
lines changed

src/adc.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ use std::time::Duration;
1919

2020
use anyhow::Result;
2121
use async_std::sync::Arc;
22-
use async_std::task::{sleep, spawn};
22+
use async_std::task::sleep;
2323

2424
use crate::broker::{BrokerBuilder, Topic};
2525
use crate::measurement::{Measurement, Timestamp};
26+
use crate::watched_tasks::WatchedTasksBuilder;
2627

2728
const HISTORY_LENGTH: usize = 200;
2829
const SLOW_INTERVAL: Duration = Duration::from_millis(100);
@@ -77,9 +78,9 @@ pub struct Adc {
7778
}
7879

7980
impl Adc {
80-
pub async fn new(bb: &mut BrokerBuilder) -> Result<Self> {
81-
let stm32_thread = IioThread::new_stm32().await?;
82-
let powerboard_thread = IioThread::new_powerboard().await?;
81+
pub async fn new(bb: &mut BrokerBuilder, wtb: &mut WatchedTasksBuilder) -> Result<Self> {
82+
let stm32_thread = IioThread::new_stm32(wtb).await?;
83+
let powerboard_thread = IioThread::new_powerboard(wtb).await?;
8384

8485
let adc = Self {
8586
usb_host_curr: AdcChannel {
@@ -212,7 +213,7 @@ impl Adc {
212213

213214
// Spawn an async task to transfer values from the Atomic value based
214215
// "fast" interface to the broker based "slow" interface.
215-
spawn(async move {
216+
wtb.spawn_task("adc-update", async move {
216217
loop {
217218
sleep(SLOW_INTERVAL).await;
218219

@@ -226,7 +227,7 @@ impl Adc {
226227

227228
time.set(Timestamp::now());
228229
}
229-
});
230+
})?;
230231

231232
Ok(adc)
232233
}

src/adc/iio/demo_mode.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ pub struct IioThread {
160160
}
161161

162162
impl IioThread {
163-
pub async fn new_stm32() -> Result<Arc<Self>> {
163+
pub async fn new_stm32<W>(_wtb: &W) -> Result<Arc<Self>> {
164164
let mut demo_magic = block_on(DEMO_MAGIC_STM32.lock());
165165

166166
// Only ever set up a single demo_mode "IioThread" per ADC
@@ -195,7 +195,7 @@ impl IioThread {
195195
Ok(this)
196196
}
197197

198-
pub async fn new_powerboard() -> Result<Arc<Self>> {
198+
pub async fn new_powerboard<W>(_wtb: &W) -> Result<Arc<Self>> {
199199
let mut demo_magic = block_on(DEMO_MAGIC_POWERBOARD.lock());
200200

201201
// Only ever set up a single demo_mode "IioThread" per ADC

src/adc/iio/hardware.rs

Lines changed: 93 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@ use std::fs::create_dir;
2020
use std::io::Read;
2121
use std::path::Path;
2222
use std::sync::atomic::{AtomicU16, AtomicU64, Ordering};
23-
use std::sync::Mutex;
24-
use std::thread;
25-
use std::thread::JoinHandle;
2623
use std::time::{Duration, Instant};
2724

2825
use anyhow::{anyhow, Context, Error, Result};
@@ -35,6 +32,7 @@ use log::{debug, error, warn};
3532
use thread_priority::*;
3633

3734
use crate::measurement::{Measurement, Timestamp};
35+
use crate::watched_tasks::WatchedTasksBuilder;
3836

3937
struct ChannelDesc {
4038
kernel_name: &'static str,
@@ -255,7 +253,6 @@ pub struct IioThread {
255253
ref_instant: Instant,
256254
timestamp: AtomicU64,
257255
values: Vec<AtomicU16>,
258-
join: Mutex<Option<JoinHandle<()>>>,
259256
channel_descs: &'static [ChannelDesc],
260257
}
261258

@@ -325,7 +322,8 @@ impl IioThread {
325322
}
326323

327324
async fn new(
328-
thread_name: &str,
325+
wtb: &mut WatchedTasksBuilder,
326+
thread_name: &'static str,
329327
adc_name: &'static str,
330328
trigger_name: &'static str,
331329
sample_rate: i64,
@@ -342,102 +340,101 @@ impl IioThread {
342340
let (thread_res_tx, thread_res_rx) = bounded(1);
343341

344342
// Spawn a high priority thread that updates the atomic values in `thread`.
345-
let join = thread::Builder::new()
346-
.name(format!("tacd {thread_name} iio"))
347-
.spawn(move || {
348-
let adc_setup_res = Self::adc_setup(
349-
adc_name,
350-
trigger_name,
351-
sample_rate,
352-
channel_descs,
353-
buffer_len,
354-
);
355-
let (thread, channels, mut buf) = match adc_setup_res {
356-
Ok((channels, buf)) => {
357-
let thread = Arc::new(Self {
358-
ref_instant: Instant::now(),
359-
timestamp: AtomicU64::new(TIMESTAMP_ERROR),
360-
values: channels.iter().map(|_| AtomicU16::new(0)).collect(),
361-
join: Mutex::new(None),
362-
channel_descs,
363-
});
364-
365-
(thread, channels, buf)
366-
}
367-
Err(e) => {
368-
// Can not fail in practice as the queue is known to be empty
369-
// at this point.
370-
thread_res_tx.try_send(Err(e)).unwrap();
371-
return;
372-
}
373-
};
374-
375-
let thread_weak = Arc::downgrade(&thread);
376-
let mut signal_ready = Some((thread, thread_res_tx));
377-
378-
// Stop running as soon as the last reference to this Arc<IioThread>
379-
// is dropped (e.g. the weak reference can no longer be upgraded).
380-
while let Some(thread) = thread_weak.upgrade() {
381-
if let Err(e) = buf.refill() {
382-
thread.timestamp.store(TIMESTAMP_ERROR, Ordering::Relaxed);
383-
384-
error!("Failed to refill {} ADC buffer: {}", adc_name, e);
385-
386-
// If the ADC has not yet produced any values we still have the
387-
// queue at hand that signals readiness to the main thread.
388-
// This gives us a chance to return an Err from new().
389-
// If the queue was already used just print an error instead.
390-
if let Some((_, tx)) = signal_ready.take() {
391-
// Can not fail in practice as the queue is only .take()n
392-
// once and thus known to be empty.
393-
tx.try_send(Err(Error::new(e))).unwrap();
394-
}
395-
396-
break;
397-
}
398-
399-
let values = channels.iter().map(|ch| {
400-
let buf_sum: u32 = buf.channel_iter::<u16>(ch).map(|v| v as u32).sum();
401-
(buf_sum / (buf.capacity() as u32)) as u16
343+
wtb.spawn_thread(thread_name, move || {
344+
let adc_setup_res = Self::adc_setup(
345+
adc_name,
346+
trigger_name,
347+
sample_rate,
348+
channel_descs,
349+
buffer_len,
350+
);
351+
let (thread, channels, mut buf) = match adc_setup_res {
352+
Ok((channels, buf)) => {
353+
let thread = Arc::new(Self {
354+
ref_instant: Instant::now(),
355+
timestamp: AtomicU64::new(TIMESTAMP_ERROR),
356+
values: channels.iter().map(|_| AtomicU16::new(0)).collect(),
357+
channel_descs,
402358
});
403359

404-
for (d, s) in thread.values.iter().zip(values) {
405-
d.store(s, Ordering::Relaxed)
406-
}
360+
(thread, channels, buf)
361+
}
362+
Err(e) => {
363+
// Can not fail in practice as the queue is known to be empty
364+
// at this point.
365+
thread_res_tx
366+
.try_send(Err(e))
367+
.expect("Failed to signal ADC setup error due to full queue");
368+
return Ok(());
369+
}
370+
};
371+
372+
let thread_weak = Arc::downgrade(&thread);
373+
let mut signal_ready = Some((thread, thread_res_tx));
407374

408-
// These should only fail if
409-
// a) The monotonic time started running backward
410-
// b) The tacd has been running for more than 2**64ns (584 years).
411-
let ts: u64 = Instant::now()
412-
.checked_duration_since(thread.ref_instant)
413-
.and_then(|d| d.as_nanos().try_into().ok())
414-
.unwrap_or(TIMESTAMP_ERROR);
375+
// Stop running as soon as the last reference to this Arc<IioThread>
376+
// is dropped (e.g. the weak reference can no longer be upgraded).
377+
while let Some(thread) = thread_weak.upgrade() {
378+
if let Err(e) = buf.refill() {
379+
thread.timestamp.store(TIMESTAMP_ERROR, Ordering::Relaxed);
415380

416-
thread.timestamp.store(ts, Ordering::Release);
381+
error!("Failed to refill {} ADC buffer: {}", adc_name, e);
417382

418-
// Now that we know that the ADC actually works and we have
419-
// initial values: return a handle to it.
420-
if let Some((content, tx)) = signal_ready.take() {
383+
// If the ADC has not yet produced any values we still have the
384+
// queue at hand that signals readiness to the main thread.
385+
// This gives us a chance to return an Err from new().
386+
// If the queue was already used just print an error instead.
387+
if let Some((_, tx)) = signal_ready.take() {
421388
// Can not fail in practice as the queue is only .take()n
422389
// once and thus known to be empty.
423-
tx.try_send(Ok(content)).unwrap();
390+
tx.try_send(Err(Error::new(e)))
391+
.expect("Failed to signal ADC setup error due to full queue");
424392
}
393+
394+
break;
425395
}
426-
})?;
427396

428-
let thread = thread_res_rx.recv().await??;
397+
let values = channels.iter().map(|ch| {
398+
let buf_sum: u32 = buf.channel_iter::<u16>(ch).map(|v| v as u32).sum();
399+
(buf_sum / (buf.capacity() as u32)) as u16
400+
});
429401

430-
// Locking the Mutex could only fail if the Mutex was poisoned by
431-
// a thread that held the lock and panicked.
432-
// At this point the Mutex has not yet been locked in another thread.
433-
*thread.join.lock().unwrap() = Some(join);
402+
for (d, s) in thread.values.iter().zip(values) {
403+
d.store(s, Ordering::Relaxed)
404+
}
405+
406+
// These should only fail if
407+
// a) The monotonic time started running backward
408+
// b) The tacd has been running for more than 2**64ns (584 years).
409+
let ts: u64 = Instant::now()
410+
.checked_duration_since(thread.ref_instant)
411+
.and_then(|d| d.as_nanos().try_into().ok())
412+
.unwrap_or(TIMESTAMP_ERROR);
413+
414+
thread.timestamp.store(ts, Ordering::Release);
415+
416+
// Now that we know that the ADC actually works and we have
417+
// initial values: return a handle to it.
418+
if let Some((content, tx)) = signal_ready.take() {
419+
// Can not fail in practice as the queue is only .take()n
420+
// once and thus known to be empty.
421+
tx.try_send(Ok(content))
422+
.expect("Failed to signal ADC setup completion due to full queue");
423+
}
424+
}
425+
426+
Ok(())
427+
})?;
428+
429+
let thread = thread_res_rx.recv().await??;
434430

435431
Ok(thread)
436432
}
437433

438-
pub async fn new_stm32() -> Result<Arc<Self>> {
434+
pub async fn new_stm32(wtb: &mut WatchedTasksBuilder) -> Result<Arc<Self>> {
439435
Self::new(
440-
"stm32",
436+
wtb,
437+
"adc-stm32",
441438
"48003000.adc:adc@0",
442439
"tim4_trgo",
443440
80,
@@ -447,14 +444,23 @@ impl IioThread {
447444
.await
448445
}
449446

450-
pub async fn new_powerboard() -> Result<Arc<Self>> {
447+
pub async fn new_powerboard(wtb: &mut WatchedTasksBuilder) -> Result<Arc<Self>> {
451448
let hr_trigger_path = Path::new(TRIGGER_HR_PWR_DIR);
452449

453450
if !hr_trigger_path.is_dir() {
454451
create_dir(hr_trigger_path)?;
455452
}
456453

457-
Self::new("powerboard", "lmp92064", "tacd-pwr", 20, CHANNELS_PWR, 1).await
454+
Self::new(
455+
wtb,
456+
"adc-powerboard",
457+
"lmp92064",
458+
"tacd-pwr",
459+
20,
460+
CHANNELS_PWR,
461+
1,
462+
)
463+
.await
458464
}
459465

460466
/// Use the channel names defined at the top of the file to get a reference

src/adc/iio/test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ pub struct IioThread {
107107
}
108108

109109
impl IioThread {
110-
pub async fn new_stm32() -> Result<Arc<Self>> {
110+
pub async fn new_stm32<W>(_wtb: &W) -> Result<Arc<Self>> {
111111
let mut channels = Vec::new();
112112

113113
for name in CHANNELS_STM32 {
@@ -117,7 +117,7 @@ impl IioThread {
117117
Ok(Arc::new(Self { channels }))
118118
}
119119

120-
pub async fn new_powerboard() -> Result<Arc<Self>> {
120+
pub async fn new_powerboard<W>(_wtb: &W) -> Result<Arc<Self>> {
121121
let mut channels = Vec::new();
122122

123123
for name in CHANNELS_PWR {

src/backlight.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
use anyhow::Result;
1919
use async_std::prelude::*;
2020
use async_std::sync::Arc;
21-
use async_std::task::spawn;
2221
use log::warn;
2322

2423
mod demo_mode;
@@ -30,21 +29,22 @@ use demo_mode::{Backlight as SysBacklight, Brightness, SysClass};
3029
use sysfs_class::{Backlight as SysBacklight, Brightness, SysClass};
3130

3231
use crate::broker::{BrokerBuilder, Topic};
32+
use crate::watched_tasks::WatchedTasksBuilder;
3333

3434
pub struct Backlight {
3535
pub brightness: Arc<Topic<f32>>,
3636
}
3737

3838
impl Backlight {
39-
pub fn new(bb: &mut BrokerBuilder) -> Result<Self> {
39+
pub fn new(bb: &mut BrokerBuilder, wtb: &mut WatchedTasksBuilder) -> Result<Self> {
4040
let brightness = bb.topic_rw("/v1/tac/display/backlight/brightness", Some(1.0));
4141

4242
let (mut rx, _) = brightness.clone().subscribe_unbounded();
4343

4444
let backlight = SysBacklight::new("backlight")?;
4545
let max_brightness = backlight.max_brightness()?;
4646

47-
spawn(async move {
47+
wtb.spawn_task("backlight-dimmer", async move {
4848
while let Some(fraction) = rx.next().await {
4949
let brightness = (max_brightness as f32) * fraction;
5050
let mut brightness = brightness.clamp(0.0, max_brightness as f32) as u64;
@@ -60,7 +60,9 @@ impl Backlight {
6060
warn!("Failed to set LED pattern: {}", e);
6161
}
6262
}
63-
});
63+
64+
Ok(())
65+
})?;
6466

6567
Ok(Self { brightness })
6668
}

src/broker.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515
// with this program; if not, write to the Free Software Foundation, Inc.,
1616
// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
1717

18+
use anyhow::Result;
1819
use async_std::sync::Arc;
1920
use serde::{de::DeserializeOwned, Serialize};
2021

22+
use crate::watched_tasks::WatchedTasksBuilder;
23+
2124
mod mqtt_conn;
2225
mod persistence;
2326
mod rest;
@@ -113,11 +116,13 @@ impl BrokerBuilder {
113116
/// Finish building the broker
114117
///
115118
/// This consumes the builder so that no new topics can be registered
116-
pub fn build(self, server: &mut tide::Server<()>) {
119+
pub fn build(self, wtb: &mut WatchedTasksBuilder, server: &mut tide::Server<()>) -> Result<()> {
117120
let topics = Arc::new(self.topics);
118121

119-
persistence::register(topics.clone());
122+
persistence::register(wtb, topics.clone())?;
120123
rest::register(server, topics.clone());
121124
mqtt_conn::register(server, topics);
125+
126+
Ok(())
122127
}
123128
}

0 commit comments

Comments
 (0)