@@ -27,7 +27,6 @@ use std::time::{Duration, Instant};
27
27
28
28
use anyhow:: { anyhow, Context , Error , Result } ;
29
29
use async_std:: channel:: bounded;
30
- use async_std:: stream:: StreamExt ;
31
30
use async_std:: sync:: Arc ;
32
31
33
32
use industrial_io:: { Buffer , Channel } ;
@@ -106,8 +105,20 @@ const CHANNELS_PWR: &[ChannelDesc] = &[
106
105
107
106
const TRIGGER_HR_PWR_DIR : & str = "/sys/kernel/config/iio/triggers/hrtimer/tacd-pwr" ;
108
107
108
+ // Timestamps are stored in a 64 Bit atomic variable containing the
109
+ // time in nanoseconds passed since the tacd was started.
110
+ // To reach u64::MAX the tacd would need to run for 2^64ns which is
111
+ // about 584 years.
109
112
const TIMESTAMP_ERROR : u64 = u64:: MAX ;
110
113
114
+ #[ derive( Debug ) ]
115
+ pub enum AdcReadError {
116
+ Again ,
117
+ MismatchedChannels ,
118
+ AquisitionError ,
119
+ TimeStampError ,
120
+ }
121
+
111
122
#[ derive( Clone , Copy ) ]
112
123
struct Calibration {
113
124
scale : f32 ,
@@ -185,54 +196,56 @@ impl CalibratedChannel {
185
196
pub fn try_get_multiple < const N : usize > (
186
197
& self ,
187
198
channels : [ & Self ; N ] ,
188
- ) -> Option < [ Measurement ; N ] > {
199
+ ) -> Result < [ Measurement ; N ] , AdcReadError > {
189
200
let ts_before = self . iio_thread . timestamp . load ( Ordering :: Acquire ) ;
190
201
191
202
let mut values_raw = [ 0 ; N ] ;
192
203
for ( d, ch) in values_raw. iter_mut ( ) . zip ( channels. iter ( ) ) {
193
- assert ! (
194
- Arc :: ptr_eq( & self . iio_thread, & ch. iio_thread) ,
195
- "Can only get synchronized adc values for the same thread"
196
- ) ;
204
+ // Can only get time-aligned values for channels of the same ADC
205
+ if !Arc :: ptr_eq ( & self . iio_thread , & ch. iio_thread ) {
206
+ return Err ( AdcReadError :: MismatchedChannels ) ;
207
+ }
208
+
197
209
* d = self . iio_thread . values [ ch. index ] . load ( Ordering :: Relaxed ) ;
198
210
}
199
211
200
212
let ts_after = self . iio_thread . timestamp . load ( Ordering :: Acquire ) ;
201
213
202
214
if ts_before == TIMESTAMP_ERROR || ts_after == TIMESTAMP_ERROR {
203
- panic ! ( "Failed to read from ADC" ) ;
215
+ return Err ( AdcReadError :: AquisitionError ) ;
204
216
}
205
217
206
218
if ts_before == ts_after {
207
219
let ts = self
208
220
. iio_thread
209
221
. ref_instant
210
222
. checked_add ( Duration :: from_nanos ( ts_before) )
211
- . unwrap ( ) ;
223
+ . ok_or ( AdcReadError :: TimeStampError ) ? ;
212
224
let ts = Timestamp :: new ( ts) ;
213
225
214
226
let mut values = [ Measurement { ts, value : 0.0 } ; N ] ;
215
227
for i in 0 ..N {
216
228
values[ i] . value = channels[ i] . calibration . apply ( values_raw[ i] as f32 ) ;
217
229
}
218
230
219
- Some ( values)
231
+ Ok ( values)
220
232
} else {
221
- None
233
+ Err ( AdcReadError :: Again )
222
234
}
223
235
}
224
236
225
237
/// Get the value of the channel, or None if the timestamp changed while
226
238
/// reading the value (which should be extremely rare)
227
- pub fn try_get ( & self ) -> Option < Measurement > {
239
+ pub fn try_get ( & self ) -> Result < Measurement , AdcReadError > {
228
240
self . try_get_multiple ( [ self ] ) . map ( |res| res[ 0 ] )
229
241
}
230
242
231
243
// Get the current value of the channel
232
- pub fn get ( & self ) -> Measurement {
244
+ pub fn get ( & self ) -> Result < Measurement , AdcReadError > {
233
245
loop {
234
- if let Some ( r) = self . try_get ( ) {
235
- break r;
246
+ match self . try_get ( ) {
247
+ Err ( AdcReadError :: Again ) => { }
248
+ res => break res,
236
249
}
237
250
}
238
251
}
@@ -269,18 +282,23 @@ impl IioThread {
269
282
warn ! ( "Failed to disable {} ADC buffer: {}" , adc_name, err) ;
270
283
}
271
284
272
- let channels: Vec < Channel > = channel_descs
285
+ let channels: Result < Vec < Channel > > = channel_descs
273
286
. iter ( )
274
287
. map ( |ChannelDesc { kernel_name, .. } | {
275
288
let ch = adc
276
289
. find_channel ( kernel_name, false )
277
- . unwrap_or_else ( || panic ! ( "Failed to open kernel channel {}" , kernel_name) ) ;
290
+ . ok_or_else ( || anyhow ! ( "Failed to open iio channel {}" , kernel_name) ) ;
291
+
292
+ if let Ok ( ch) = ch. as_ref ( ) {
293
+ ch. enable ( ) ;
294
+ }
278
295
279
- ch. enable ( ) ;
280
296
ch
281
297
} )
282
298
. collect ( ) ;
283
299
300
+ let channels = channels?;
301
+
284
302
let trig = ctx
285
303
. find_device ( trigger_name)
286
304
. ok_or ( anyhow ! ( "Could not find IIO trigger: {}" , trigger_name) ) ?;
@@ -292,9 +310,13 @@ impl IioThread {
292
310
293
311
let buf = adc. create_buffer ( buffer_len, false ) ?;
294
312
313
+ let prio = ThreadPriorityValue :: try_from ( 10 ) . map_err ( |e| {
314
+ anyhow ! ( "Failed to set thread priority to 10 as you OS does not support it: {e:?}" )
315
+ } ) ?;
316
+
295
317
set_thread_priority_and_policy (
296
318
thread_native_id ( ) ,
297
- ThreadPriority :: Crossplatform ( ThreadPriorityValue :: try_from ( 10 ) . unwrap ( ) ) ,
319
+ ThreadPriority :: Crossplatform ( prio ) ,
298
320
ThreadSchedulePolicy :: Realtime ( RealtimeThreadSchedulePolicy :: Fifo ) ,
299
321
)
300
322
. map_err ( |e| anyhow ! ( "Failed to set realtime thread priority: {e:?}" ) ) ?;
@@ -317,7 +339,7 @@ impl IioThread {
317
339
// setup was sucessful.
318
340
// This is why we create Self inside the thread and send it back
319
341
// to the calling thread via a queue.
320
- let ( thread_res_tx, mut thread_res_rx) = bounded ( 1 ) ;
342
+ let ( thread_res_tx, thread_res_rx) = bounded ( 1 ) ;
321
343
322
344
// Spawn a high priority thread that updates the atomic values in `thread`.
323
345
let join = thread:: Builder :: new ( )
@@ -343,6 +365,8 @@ impl IioThread {
343
365
( thread, channels, buf)
344
366
}
345
367
Err ( e) => {
368
+ // Can not fail in practice as the queue is known to be empty
369
+ // at this point.
346
370
thread_res_tx. try_send ( Err ( e) ) . unwrap ( ) ;
347
371
return ;
348
372
}
@@ -364,6 +388,8 @@ impl IioThread {
364
388
// This gives us a chance to return an Err from new().
365
389
// If the queue was already used just print an error instead.
366
390
if let Some ( ( _, tx) ) = signal_ready. take ( ) {
391
+ // Can not fail in practice as the queue is only .take()n
392
+ // once and thus known to be empty.
367
393
tx. try_send ( Err ( Error :: new ( e) ) ) . unwrap ( ) ;
368
394
}
369
395
@@ -379,24 +405,31 @@ impl IioThread {
379
405
d. store ( s, Ordering :: Relaxed )
380
406
}
381
407
408
+ // These should only fail if
409
+ // a) The monotonic time started running backward
410
+ // b) The tacd has been running for more than 2**64ns (584 years).
382
411
let ts: u64 = Instant :: now ( )
383
412
. checked_duration_since ( thread. ref_instant )
384
- . unwrap ( )
385
- . as_nanos ( )
386
- . try_into ( )
387
- . unwrap ( ) ;
413
+ . and_then ( |d| d. as_nanos ( ) . try_into ( ) . ok ( ) )
414
+ . unwrap_or ( TIMESTAMP_ERROR ) ;
388
415
389
416
thread. timestamp . store ( ts, Ordering :: Release ) ;
390
417
391
418
// Now that we know that the ADC actually works and we have
392
419
// initial values: return a handle to it.
393
420
if let Some ( ( content, tx) ) = signal_ready. take ( ) {
421
+ // Can not fail in practice as the queue is only .take()n
422
+ // once and thus known to be empty.
394
423
tx. try_send ( Ok ( content) ) . unwrap ( ) ;
395
424
}
396
425
}
397
426
} ) ?;
398
427
399
- let thread = thread_res_rx. next ( ) . await . unwrap ( ) ?;
428
+ let thread = thread_res_rx. recv ( ) . await ??;
429
+
430
+ // Locking the Mutex could only fail if the Mutex was poisoned by
431
+ // a thread that held the lock and panicked.
432
+ // At this point the Mutex has not yet been locked in another thread.
400
433
* thread. join . lock ( ) . unwrap ( ) = Some ( join) ;
401
434
402
435
Ok ( thread)
@@ -418,7 +451,7 @@ impl IioThread {
418
451
let hr_trigger_path = Path :: new ( TRIGGER_HR_PWR_DIR ) ;
419
452
420
453
if !hr_trigger_path. is_dir ( ) {
421
- create_dir ( hr_trigger_path) . unwrap ( ) ;
454
+ create_dir ( hr_trigger_path) ? ;
422
455
}
423
456
424
457
Self :: new ( "powerboard" , "lmp92064" , "tacd-pwr" , 20 , CHANNELS_PWR , 1 ) . await
0 commit comments