Skip to content

Commit a7c93d8

Browse files
committed
Don't exit the locker thread immediately; fix "numpy>=2" in build.yml
1 parent 294467f commit a7c93d8

File tree

3 files changed

+49
-16
lines changed

3 files changed

+49
-16
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,7 @@ jobs:
4444
with:
4545
python-version: 3.9
4646
- name: Install optional numpy dependency
47-
run: pip install numpy>=2
48-
- uses: mxschmitt/action-tmate@v3
47+
run: pip install "numpy>=2"
4948
- name: Test
5049
id: test
5150
continue-on-error: true

src/python_spy.rs

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::collections::HashSet;
44
#[cfg(all(target_os = "linux", feature = "unwind"))]
55
use std::iter::FromIterator;
66
use std::path::Path;
7+
use std::sync::mpsc::Sender;
78

89
use anyhow::{Context, Error, Result};
910
use remoteprocess::{Pid, Process, ProcessMemory, Tid};
@@ -40,6 +41,20 @@ pub struct PythonSpy {
4041
pub dockerized: bool,
4142
}
4243

44+
/// A small helper which automatically sends a message on an mpsc channel when it is dropped.
45+
///
46+
/// * `release_lock_tx`: An mpsc sender which can send an empty () message.
47+
#[derive(Debug)]
48+
pub struct ProcessLocker {
49+
release_lock_tx: Sender<()>,
50+
}
51+
52+
impl Drop for ProcessLocker {
53+
fn drop(&mut self) {
54+
let _ = self.release_lock_tx.send(());
55+
}
56+
}
57+
4358
impl PythonSpy {
4459
/// Constructs a new PythonSpy object.
4560
pub fn new(pid: Pid, config: &Config) -> Result<PythonSpy, Error> {
@@ -197,16 +212,21 @@ impl PythonSpy {
197212
/// error out gracefully.
198213
///
199214
/// Since `remoteprocess::Process::lock` can hang if the process has exited, it is called in
200-
/// a separate thread which times out (and results in an error) if the thread doesn't lock in
201-
/// the given `lock_timeout_ms`.
215+
/// a separate locker thread which times out (and results in an error) if the target thread
216+
/// doesn't lock in the given `lock_timeout_ms`. The locker thread then waits to exit, holding
217+
/// the lock in memory. When the `ProcessLocker` object returned by this function is dropped,
218+
/// it sends a signal to the locker thread using an mpsc channel which causes the locker thread
219+
/// to finish executing (causing the lock held in that thread to be dropped, unpausing the
220+
/// target thread).
202221
///
203222
/// The approach here follows https://stackoverflow.com/a/36182336/8100451, but with
204223
/// `recv_timeout`.
205224
///
206225
/// * `pid`: ID of the process to lock
207226
/// * `lock_timeout_ms`: Length of time to wait before erroring out
208-
pub fn lock_process_with_timeout(pid: Pid, lock_timeout_ms: u64) -> Result<()> {
209-
let (tx, rx) = std::sync::mpsc::channel();
227+
pub fn lock_process_with_timeout(pid: Pid, lock_timeout_ms: u64) -> Result<ProcessLocker> {
228+
let (acquire_lock_tx, acquire_lock_rx) = std::sync::mpsc::channel();
229+
let (release_lock_tx, release_lock_rx) = std::sync::mpsc::channel::<()>();
210230

211231
// Generate a Process instance so it can be moved to the child thread
212232
let process = remoteprocess::Process::new(pid).unwrap_or_else(|_| {
@@ -218,18 +238,28 @@ impl PythonSpy {
218238
// this, because we don't care (this only happens when the worker thread has
219239
// already exceeded the timeout).
220240
match process.lock() {
221-
Ok(_) => tx.send(Ok(())),
222-
Err(error) => tx.send(Err(error)),
241+
Ok(_lock) => {
242+
let _ = acquire_lock_tx.send(Ok(()));
243+
244+
// Wait until instructed to finish execution (and drop the lock, which
245+
// unlocks the process)
246+
let _ = release_lock_rx.recv();
247+
}
248+
Err(error) => {
249+
let _ = acquire_lock_tx.send(Err(error));
250+
}
223251
}
224252
});
225253

226-
rx.recv_timeout(std::time::Duration::from_millis(lock_timeout_ms))
254+
acquire_lock_rx
255+
.recv_timeout(std::time::Duration::from_millis(lock_timeout_ms))
227256
.context(format!("Timeout acquiring lock on process {pid}"))
228257
.inspect_err(|_| {
229258
drop(join_handle);
230-
drop(rx);
259+
drop(acquire_lock_rx);
231260
})?
232-
.context(format!("Failed to suspend process {pid}"))
261+
.context(format!("Failed to suspend process {pid}"))?;
262+
Ok(ProcessLocker { release_lock_tx })
233263
}
234264

235265
// implementation of get_stack_traces, where we have a type for the InterpreterState

tests/integration_test.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ fn test_recursive() {
193193
assert!(trace.frames.len() <= 22);
194194

195195
let top_level_frame = &trace.frames[trace.frames.len() - 1];
196+
trace.frames.iter().for_each(|item| println!("{:?}", item));
196197
assert_eq!(top_level_frame.name, "<module>");
197198
assert!((top_level_frame.line == 8) || (top_level_frame.line == 7));
198199

@@ -577,8 +578,11 @@ fn test_hanging_lock_successful() {
577578
.expect("Should be able to read");
578579
assert_eq!(buffer, "awaiting input\n");
579580

580-
let result = PythonSpy::lock_process_with_timeout(child.id().try_into().unwrap(), 1000);
581-
assert!(result.is_ok());
581+
let locker = PythonSpy::lock_process_with_timeout(child.id().try_into().unwrap(), 1000);
582+
assert!(locker.is_ok());
583+
584+
// Need to drop the locker in order to unpause the child process
585+
drop(locker);
582586

583587
let mut child_stdin = child
584588
.stdin
@@ -616,9 +620,9 @@ fn test_hanging_lock_failure() {
616620
.expect("Child process doesn't have stdin handle");
617621
let _ = child_stdin.write_all(b"continue\n");
618622

619-
let result = PythonSpy::lock_process_with_timeout(child.id().try_into().unwrap(), 1000);
620-
assert!(result.is_err());
621-
assert!(result
623+
let locker = PythonSpy::lock_process_with_timeout(child.id().try_into().unwrap(), 1000);
624+
assert!(locker.is_err());
625+
assert!(locker
622626
.unwrap_err()
623627
.to_string()
624628
.contains("Failed to suspend process"));

0 commit comments

Comments
 (0)