Skip to content

Commit 12786c3

Browse files
fix(VER-2346): stop reading stout/err if subprocess is killed
1 parent 26e348c commit 12786c3

File tree

3 files changed

+73
-11
lines changed

3 files changed

+73
-11
lines changed

rs/tests/src/driver/process.rs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use std::process::{Command, ExitStatus, Stdio};
77
use tokio::{
88
io::{AsyncBufReadExt, AsyncRead, BufReader},
99
process::{Child, Command as AsyncCommand},
10+
select,
11+
sync::watch::{channel, Receiver},
1012
task::{self, JoinHandle},
1113
};
1214

@@ -31,19 +33,21 @@ impl Process {
3133
// signal.
3234
cmd.kill_on_drop(true);
3335

34-
// println!("AsyncCommand: {:?}", cmd);
3536
let mut child = cmd.spawn().expect("Spawning subprocess should succeed");
36-
// println!("Child: {:?}", child);
3737

3838
let stdout = child.stdout.take().unwrap();
39+
let (kill_tx, kill_watch) = channel::<bool>(false);
40+
3941
let stdout_jh = task::spawn(Self::listen_on_channel(
42+
kill_watch.clone(),
4043
task_id.clone(),
4144
log.clone(),
4245
ChannelName::StdOut,
4346
stdout,
4447
));
4548
let stderr = child.stderr.take().unwrap();
4649
let stderr_jh = task::spawn(Self::listen_on_channel(
50+
kill_watch,
4751
task_id,
4852
log.clone(),
4953
ChannelName::StdErr,
@@ -60,6 +64,7 @@ impl Process {
6064
let pid = Pid::from_raw(self_.child.id().unwrap() as i32);
6165
move || {
6266
let _ = kill(pid, Signal::SIGKILL);
67+
let _ = kill_tx.send(true);
6368
}
6469
};
6570

@@ -77,21 +82,34 @@ impl Process {
7782
child.wait().await
7883
}
7984

80-
async fn listen_on_channel<R>(task_id: TaskId, log: Logger, channel_tag: ChannelName, src: R)
81-
where
85+
async fn listen_on_channel<R>(
86+
mut kill_watch: Receiver<bool>,
87+
task_id: TaskId,
88+
log: Logger,
89+
channel_tag: ChannelName,
90+
src: R,
91+
) where
8292
R: AsyncRead + Unpin,
8393
{
8494
let buffered_reader = BufReader::new(src);
8595
let mut lines = buffered_reader.lines();
8696
loop {
87-
match lines.next_line().await {
88-
Ok(Some(line)) => {
89-
let task_id: String = format!("{}", task_id);
90-
let output_channel: String = format!("{:?}", channel_tag);
91-
info!(log, "{}", line; "task_id" => task_id, "output_channel" => output_channel)
97+
select! {
98+
line_res = lines.next_line() => {
99+
match line_res {
100+
Ok(Some(line)) => {
101+
let task_id: String = format!("{}", task_id);
102+
let output_channel: String = format!("{:?}", channel_tag);
103+
info!(log, "{}", line; "task_id" => task_id, "output_channel" => output_channel)
104+
}
105+
Ok(None) => break,
106+
Err(e) => eprintln!("listen_on_channel(): {:?}", e),
107+
}
108+
}
109+
_ = kill_watch.changed() => {
110+
info!(log, "({}|{:?}): Kill received.", task_id, channel_tag);
111+
return;
92112
}
93-
Ok(None) => break,
94-
Err(e) => eprintln!("listen_on_channel(): {:?}", e),
95113
}
96114
}
97115
info!(log, "({}|{:?}): Channel has closed.", task_id, channel_tag);

rs/tests/testing_verification/test_driver_e2e_scenarios.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,14 @@ fn get_all_e2e_test_scenarios() -> HashMap<String, SystemTestGroup> {
167167
.add_test(systest!(test_to_succeed))
168168
.without_farm(),
169169
),
170+
(
171+
"test_child_process".to_string(),
172+
SystemTestGroup::new()
173+
.with_timeout_per_test(Duration::from_secs(5))
174+
.with_setup(setup_to_succeed)
175+
.add_test(systest!(spawning_process))
176+
.without_farm(),
177+
),
170178
])
171179
}
172180

@@ -228,3 +236,13 @@ fn never_ending_task(env: TestEnv) {
228236
std::thread::sleep(Duration::from_secs(1));
229237
}
230238
}
239+
240+
// Simulate a scenario where a test that starts a subprocess gets stopped by a timeout in the test
241+
// driver.
242+
fn spawning_process(_env: TestEnv) {
243+
let _ = std::process::Command::new("sh")
244+
.arg("-c")
245+
.arg("for i in `seq 100`; do echo magicchild$i; sleep 1; done")
246+
.spawn();
247+
std::thread::sleep(std::time::Duration::from_secs(100));
248+
}

rs/tests/testing_verification/test_driver_tests.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,32 @@ fn test_overall_group_timeout_in_setup() {
496496
);
497497
}
498498

499+
#[test]
500+
fn test_test_spawning_proc_gets_stopped() {
501+
let result = execute_test_scenario_with_default_cmd("test_child_process");
502+
assert!(
503+
!result.status.success(),
504+
"{}",
505+
String::from_utf8_lossy(&result.stderr)
506+
);
507+
508+
let err_str = String::from_utf8_lossy(&result.stderr);
509+
// The process started by the test print a string magicchild1 at second 1, magicchild2 at
510+
// second 2, etc. The per test timeout is 5 seconds, so, 'magicchild1' should be visible, but
511+
// not 'magicchild10'.
512+
// Further, we test that the test returns even though the child process of the test keeps
513+
// stdout/err open.
514+
assert!(err_str.contains("magicchild1"));
515+
assert!(!err_str.contains("magicchild10"));
516+
let summary = extract_report(result.stderr).expect("Failed to extract report from logs.");
517+
assert_test_summary_size(&summary, 1, 1, 0);
518+
assert_name_and_message_eq(
519+
&summary.failure[0],
520+
"spawning_process",
521+
Some("Timeout after 5s"),
522+
);
523+
}
524+
499525
#[test]
500526
fn test_setup_failure_file_written() {
501527
let working_dir = create_unique_working_dir();

0 commit comments

Comments
 (0)