Skip to content

Commit 08a7449

Browse files
committed
Avoid futures::executor::block_on in coordinator <-> worker IO
As Alice Rhyl recommended (slightly reworded) > Either you can use `tokio::runtime::Handle::block_on` [instead of > `futures::executor::block_on`] or your program has a subtle > deadlock.
1 parent 0f564e6 commit 08a7449

File tree

2 files changed

+7
-3
lines changed

2 files changed

+7
-3
lines changed

compiler/base/orchestrator/src/coordinator.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2848,8 +2848,10 @@ fn spawn_io_queue(stdin: ChildStdin, stdout: ChildStdout, token: CancellationTok
28482848
let stdin = SyncIoBridge::new(stdin);
28492849
let mut stdin = BufWriter::new(stdin);
28502850

2851+
let handle = tokio::runtime::Handle::current();
2852+
28512853
loop {
2852-
let coordinator_msg = futures::executor::block_on(async {
2854+
let coordinator_msg = handle.block_on(async {
28532855
select! {
28542856
() = token.cancelled() => None,
28552857
msg = rx.recv() => msg,

compiler/base/orchestrator/src/worker.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,8 @@ impl ProcessState {
450450
let statistics_task = tokio::task::spawn_blocking({
451451
let child_id = child.id();
452452
let worker_msg_tx = worker_msg_tx.clone();
453-
move || stream_command_statistics(child_id, worker_msg_tx)
453+
let handle = tokio::runtime::Handle::current();
454+
move || stream_command_statistics(child_id, worker_msg_tx, handle)
454455
});
455456

456457
let task_set = stream_stdio(worker_msg_tx.clone(), stdin_rx, stdin, stdout, stderr);
@@ -943,6 +944,7 @@ mod stats {
943944
fn stream_command_statistics(
944945
child_id: Option<u32>,
945946
worker_msg_tx: MultiplexingSender,
947+
handle: tokio::runtime::Handle,
946948
) -> Result<(), CommandStatisticsError> {
947949
use command_statistics_error::*;
948950
use stats::*;
@@ -959,7 +961,7 @@ fn stream_command_statistics(
959961
let process = Process::new(process_id).context(InvalidProcessSnafu { process_id })?;
960962

961963
while let Some(stats) = process.stats() {
962-
let sent = futures::executor::block_on(worker_msg_tx.send_ok(stats));
964+
let sent = handle.block_on(worker_msg_tx.send_ok(stats));
963965
if sent.is_err() {
964966
// No one listening anymore
965967
break;

0 commit comments

Comments
 (0)