Skip to content

Commit 036f2b8

Browse files
committed
Trigger CancellationTokens on drop
We'd previously done this for one specific usage via the `CancelOnDrop` wrapper. This works for the more complicated case where we need to continue to clone the token. Simpler cases can use the `DropGuard` directly.
1 parent e226d44 commit 036f2b8

File tree

2 files changed

+13
-10
lines changed

2 files changed

+13
-10
lines changed

compiler/base/orchestrator/src/worker.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use tokio::{
4646
sync::mpsc,
4747
task::JoinSet,
4848
};
49-
use tokio_util::sync::CancellationToken;
49+
use tokio_util::sync::{CancellationToken, DropGuard};
5050

5151
use crate::{
5252
bincode_input_closed,
@@ -405,7 +405,7 @@ struct ProcessState {
405405
processes: JoinSet<Result<(), ProcessError>>,
406406
stdin_senders: HashMap<JobId, mpsc::Sender<String>>,
407407
stdin_shutdown_tx: mpsc::Sender<JobId>,
408-
kill_tokens: HashMap<JobId, CancellationToken>,
408+
kill_tokens: HashMap<JobId, DropGuard>,
409409
}
410410

411411
impl ProcessState {
@@ -458,7 +458,7 @@ impl ProcessState {
458458

459459
let task_set = stream_stdio(worker_msg_tx.clone(), stdin_rx, stdin, stdout, stderr);
460460

461-
self.kill_tokens.insert(job_id, token.clone());
461+
self.kill_tokens.insert(job_id, token.clone().drop_guard());
462462

463463
self.processes.spawn({
464464
let stdin_shutdown_tx = self.stdin_shutdown_tx.clone();
@@ -510,8 +510,8 @@ impl ProcessState {
510510
}
511511

512512
fn kill(&mut self, job_id: JobId) {
513-
if let Some(token) = self.kill_tokens.get(&job_id) {
514-
token.cancel();
513+
if let Some(token) = self.kill_tokens.remove(&job_id) {
514+
drop(token);
515515
}
516516
}
517517
}

ui/src/server_axum/websocket.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use tokio::{
2929
task::{AbortHandle, JoinSet},
3030
time,
3131
};
32-
use tokio_util::sync::CancellationToken;
32+
use tokio_util::sync::{CancellationToken, DropGuard};
3333
use tracing::{error, info, instrument, warn, Instrument};
3434

3535
#[derive(Debug, serde::Deserialize, serde::Serialize)]
@@ -525,7 +525,7 @@ async fn handle_idle(manager: &mut CoordinatorManager, tx: &ResponseTx) -> Contr
525525
ControlFlow::Continue(())
526526
}
527527

528-
type ActiveExecutionInfo = (CancellationToken, Option<mpsc::Sender<String>>);
528+
type ActiveExecutionInfo = (DropGuard, Option<mpsc::Sender<String>>);
529529

530530
async fn handle_msg(
531531
txt: &str,
@@ -545,7 +545,10 @@ async fn handle_msg(
545545

546546
let guard = db.clone().start_with_guard("ws.Execute", txt).await;
547547

548-
active_executions.insert(meta.sequence_number, (token.clone(), Some(execution_tx)));
548+
active_executions.insert(
549+
meta.sequence_number,
550+
(token.clone().drop_guard(), Some(execution_tx)),
551+
);
549552

550553
// TODO: Should a single execute / build / etc. session have a timeout of some kind?
551554
let spawned = manager
@@ -602,11 +605,11 @@ async fn handle_msg(
602605
}
603606

604607
Ok(ExecuteKill { meta }) => {
605-
let Some((token, _)) = active_executions.get(&meta.sequence_number) else {
608+
let Some((token, _)) = active_executions.remove(&meta.sequence_number) else {
606609
warn!("Received kill for an execution that is no longer active");
607610
return;
608611
};
609-
token.cancel();
612+
drop(token);
610613
}
611614

612615
Err(e) => {

0 commit comments

Comments
 (0)