Skip to content

Commit b072db6

Browse files
committed
refactor(test_runner): hold a lock of probe_rs::Session in ReadRtt
This makes more sense than the current design where the lock is acquired on each read operation, which pointlessly allows for having two instances of `ReadRtt` pointing to the same `Session` and performing interleaved reads.
1 parent b377bcd commit b072db6

File tree

4 files changed

+86
-51
lines changed

4 files changed

+86
-51
lines changed

Cargo.lock

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/r3_test_runner/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ aho-corasick = { version = "0.7.13" }
1313
futures-core = { version = "0.3.5" }
1414
probe-rs-rtt = { version = "0.12.0" }
1515
tokio-serial = { version = "5.4.1" }
16+
async-mutex = { version = "1.4.0" }
1617
lazy_static = { version = "1.4.0" }
1718
env_logger = { version = "0.8.4" }
1819
serde_json = { version = "1.0.57" }

src/r3_test_runner/src/targets/jlink.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use anyhow::Result;
2+
use async_mutex::Mutex as AsyncMutex;
23
use std::{
34
fmt::Write,
45
future::Future,
56
io,
67
path::Path,
78
pin::Pin,
8-
sync::{Arc, Mutex},
9+
sync::Arc,
910
task::{Context, Poll},
1011
};
1112
use tempdir::TempDir;
@@ -159,13 +160,13 @@ impl DebugProbe for Fe310JLinkDebugProbe {
159160
let probe = probe_rs::Probe::open(selector).map_err(RunError::OpenProbe)?;
160161

161162
let selector: probe_rs::config::TargetSelector = "riscv".try_into().unwrap();
162-
let session = Arc::new(Mutex::new(
163+
let session = Arc::new(AsyncMutex::new(
163164
probe.attach(selector).map_err(RunError::Attach)?,
164165
));
165166

166167
// Open the RTT channels
167168
Ok(super::probe_rs::attach_rtt(
168-
session,
169+
session.try_lock_arc().unwrap(),
169170
&exe,
170171
super::probe_rs::RttOptions {
171172
// The RISC-V External Debug Support specification 0.13 (to

src/r3_test_runner/src/targets/probe_rs.rs

Lines changed: 65 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
use anyhow::Result;
2+
use async_mutex::{Mutex as AsyncMutex, MutexGuardArc as AsyncMutexGuard};
23
use futures_core::ready;
34
use std::{
45
future::Future,
56
io::Write,
67
mem::replace,
78
path::Path,
89
pin::Pin,
9-
sync::{Arc, Mutex},
10+
sync::Arc,
1011
task::{Context, Poll},
1112
time::{Duration, Instant},
1213
};
@@ -60,7 +61,7 @@ impl Target for NucleoF401re {
6061
}
6162

6263
struct ProbeRsDebugProbe {
63-
session: Arc<Mutex<probe_rs::Session>>,
64+
session: Arc<AsyncMutex<probe_rs::Session>>,
6465
}
6566

6667
#[derive(thiserror::Error, Debug)]
@@ -86,7 +87,7 @@ impl ProbeRsDebugProbe {
8687
) -> anyhow::Result<Self> {
8788
let probe = probe_rs::Probe::open(probe_sel).map_err(OpenError::OpenProbe)?;
8889

89-
let session = Arc::new(Mutex::new(
90+
let session = Arc::new(AsyncMutex::new(
9091
probe.attach(target_sel).map_err(OpenError::Attach)?,
9192
));
9293

@@ -100,28 +101,29 @@ impl DebugProbe for ProbeRsDebugProbe {
100101
exe: &Path,
101102
) -> Pin<Box<dyn Future<Output = Result<DynAsyncRead<'_>>> + '_>> {
102103
let exe = exe.to_owned();
103-
let session = Arc::clone(&self.session);
104104

105105
Box::pin(async move {
106+
let mut session = self.session.lock_arc().await;
107+
106108
// Flash the executable
107109
log::debug!("Flashing '{0}'", exe.display());
108110

109-
let session2 = Arc::clone(&session);
110111
let exe2 = exe.clone();
111-
spawn_blocking(move || {
112-
let mut session_lock = session2.lock().unwrap();
112+
let mut session = spawn_blocking(move || {
113113
probe_rs::flashing::download_file(
114-
&mut *session_lock,
114+
&mut *session,
115115
&exe2,
116116
probe_rs::flashing::Format::Elf,
117-
)
117+
)?;
118+
Ok(session)
118119
})
119120
.await
120121
.unwrap()
121122
.map_err(RunError::Flash)?;
122123

123124
// Reset the core
124-
(session.lock().unwrap().core(0))
125+
session
126+
.core(0)
125127
.map_err(RunError::Reset)?
126128
.reset()
127129
.map_err(RunError::Reset)?;
@@ -152,7 +154,7 @@ pub struct RttOptions {
152154
}
153155

154156
pub async fn attach_rtt(
155-
session: Arc<Mutex<probe_rs::Session>>,
157+
mut session: AsyncMutexGuard<probe_rs::Session>,
156158
exe: &Path,
157159
options: RttOptions,
158160
) -> Result<DynAsyncRead<'static>, AttachRttError> {
@@ -185,32 +187,32 @@ pub async fn attach_rtt(
185187
// Attach to RTT
186188
let start = Instant::now();
187189
let rtt = loop {
188-
let session = session.clone();
189190
let halt_on_access = options.halt_on_access;
190191
let rtt_scan_region = rtt_scan_region.clone();
191192

192-
let result = spawn_blocking(move || {
193-
let mut session = session.lock().unwrap();
193+
let (result, session2) = spawn_blocking(move || {
194194
let memory_map = session.target().memory_map.clone();
195-
let mut core = session.core(0).map_err(AttachRttError::HaltCore)?;
196-
let halt_guard;
197-
let core = if halt_on_access {
198-
halt_guard = CoreHaltGuard::new(&mut core).map_err(AttachRttError::HaltCore)?;
199-
&mut *halt_guard.core
200-
} else {
201-
&mut core
202-
};
195+
let result = {
196+
let mut core = session.core(0).map_err(AttachRttError::HaltCore)?;
197+
let halt_guard;
198+
let core = if halt_on_access {
199+
halt_guard = CoreHaltGuard::new(&mut core).map_err(AttachRttError::HaltCore)?;
200+
&mut *halt_guard.core
201+
} else {
202+
&mut core
203+
};
203204

204-
let result = match probe_rs_rtt::Rtt::attach_region(core, &memory_map, &rtt_scan_region)
205-
{
206-
Ok(rtt) => Some(rtt),
207-
Err(probe_rs_rtt::Error::ControlBlockNotFound) => None,
208-
Err(e) => return Err(AttachRttError::AttachRtt(e)),
205+
match probe_rs_rtt::Rtt::attach_region(core, &memory_map, &rtt_scan_region) {
206+
Ok(rtt) => Some(rtt),
207+
Err(probe_rs_rtt::Error::ControlBlockNotFound) => None,
208+
Err(e) => return Err(AttachRttError::AttachRtt(e)),
209+
}
209210
};
210-
Ok(result)
211+
Ok((result, session))
211212
})
212213
.await
213214
.unwrap()?;
215+
session = session2;
214216

215217
if let Some(rtt) = result {
216218
break rtt;
@@ -271,7 +273,6 @@ impl Drop for CoreHaltGuard<'_, '_> {
271273
}
272274

273275
struct ReadRtt {
274-
session: Arc<Mutex<probe_rs::Session>>,
275276
options: RttOptions,
276277
st: ReadRttSt,
277278
}
@@ -281,19 +282,28 @@ enum ReadRttSt {
281282
/// `<ReadRtt as AsyncRead>`.
282283
Idle {
283284
buf: ReadRttBuf,
285+
session: AsyncMutexGuard<probe_rs::Session>,
284286
rtt: Box<probe_rs_rtt::Rtt>,
285287
pos: usize,
286288
len: usize,
287289
},
288290

289291
/// `ReadRtt` is currently fetching new data from RTT channels.
290292
Read {
291-
join_handle: JoinHandle<tokio::io::Result<(ReadRttBuf, usize, Box<probe_rs_rtt::Rtt>)>>,
293+
join_handle: JoinHandle<
294+
tokio::io::Result<(
295+
ReadRttBuf,
296+
AsyncMutexGuard<probe_rs::Session>,
297+
usize,
298+
Box<probe_rs_rtt::Rtt>,
299+
)>,
300+
>,
292301
},
293302

294303
/// `ReadRtt` is waiting for some time before trying reading again.
295304
PollDelay {
296305
buf: ReadRttBuf,
306+
session: AsyncMutexGuard<probe_rs::Session>,
297307
rtt: Box<probe_rs_rtt::Rtt>,
298308
delay: Pin<Box<Sleep>>,
299309
},
@@ -305,15 +315,15 @@ type ReadRttBuf = Box<[u8; 1024]>;
305315

306316
impl ReadRtt {
307317
fn new(
308-
session: Arc<Mutex<probe_rs::Session>>,
318+
session: AsyncMutexGuard<probe_rs::Session>,
309319
rtt: probe_rs_rtt::Rtt,
310320
options: RttOptions,
311321
) -> Self {
312322
Self {
313-
session,
314323
options,
315324
st: ReadRttSt::Idle {
316325
buf: Box::new([0u8; 1024]),
326+
session,
317327
rtt: Box::new(rtt),
318328
pos: 0,
319329
len: 0,
@@ -346,22 +356,28 @@ impl AsyncBufRead for ReadRtt {
346356
ReadRttSt::Idle { pos, len, .. } => {
347357
if *pos == *len {
348358
// Buffer is empty; start reading RTT channels
349-
let (mut buf, mut rtt) = match replace(&mut this.st, ReadRttSt::Invalid) {
350-
ReadRttSt::Idle { buf, rtt, .. } => (buf, rtt),
351-
_ => unreachable!(),
352-
};
359+
let (mut buf, mut rtt, mut session) =
360+
match replace(&mut this.st, ReadRttSt::Invalid) {
361+
ReadRttSt::Idle {
362+
buf, rtt, session, ..
363+
} => (buf, rtt, session),
364+
_ => unreachable!(),
365+
};
353366

354367
let halt_on_access = this.options.halt_on_access;
355-
let session = this.session.clone();
356368

357369
// Reading RTT is a blocking operation, so do it in a
358370
// separate thread
359371
let join_handle = spawn_blocking(move || {
360-
let num_read_bytes =
361-
Self::read_inner(session, &mut rtt, &mut *buf, halt_on_access)?;
372+
let num_read_bytes = Self::read_inner(
373+
&mut session,
374+
&mut rtt,
375+
&mut *buf,
376+
halt_on_access,
377+
)?;
362378

363379
// Send the buffer back to the `ReadRtt`
364-
Ok((buf, num_read_bytes, rtt))
380+
Ok((buf, session, num_read_bytes, rtt))
365381
});
366382

367383
this.st = ReadRttSt::Read { join_handle };
@@ -379,7 +395,7 @@ impl AsyncBufRead for ReadRtt {
379395
}
380396

381397
ReadRttSt::Read { join_handle } => {
382-
let (buf, num_read_bytes, rtt) =
398+
let (buf, session, num_read_bytes, rtt) =
383399
match ready!(Pin::new(join_handle).poll(cx)).unwrap() {
384400
Ok(x) => x,
385401
Err(e) => return Poll::Ready(Err(e)),
@@ -389,12 +405,14 @@ impl AsyncBufRead for ReadRtt {
389405
// If no bytes were read, wait for a while and try again
390406
ReadRttSt::PollDelay {
391407
buf,
408+
session,
392409
rtt,
393410
delay: Box::pin(sleep(POLL_INTERVAL)),
394411
}
395412
} else {
396413
ReadRttSt::Idle {
397414
buf,
415+
session,
398416
rtt,
399417
pos: 0,
400418
len: num_read_bytes,
@@ -405,13 +423,16 @@ impl AsyncBufRead for ReadRtt {
405423
ReadRttSt::PollDelay { delay, .. } => {
406424
ready!(delay.as_mut().poll(cx));
407425

408-
let (buf, rtt) = match replace(&mut this.st, ReadRttSt::Invalid) {
409-
ReadRttSt::PollDelay { buf, rtt, .. } => (buf, rtt),
426+
let (buf, rtt, session) = match replace(&mut this.st, ReadRttSt::Invalid) {
427+
ReadRttSt::PollDelay {
428+
buf, rtt, session, ..
429+
} => (buf, rtt, session),
410430
_ => unreachable!(),
411431
};
412432

413433
this.st = ReadRttSt::Idle {
414434
buf,
435+
session,
415436
rtt,
416437
pos: 0,
417438
len: 0,
@@ -436,15 +457,11 @@ impl AsyncBufRead for ReadRtt {
436457

437458
impl ReadRtt {
438459
fn read_inner(
439-
session: Arc<Mutex<probe_rs::Session>>,
460+
session: &mut probe_rs::Session,
440461
rtt: &mut probe_rs_rtt::Rtt,
441462
buf: &mut [u8],
442463
halt_on_access: bool,
443464
) -> tokio::io::Result<usize> {
444-
// FIXME: Hold the lock in `ReadRtt`; it's pointless to be able to have
445-
// two instances of `ReadRtt` pointing to the same `Session` and
446-
// performing interleaved reads
447-
let mut session = session.lock().unwrap();
448465
let mut core = session
449466
.core(0)
450467
.map_err(|e| tokio::io::Error::new(tokio::io::ErrorKind::Other, e))?;

0 commit comments

Comments
 (0)