Skip to content

Commit 2b43e89

Browse files
authored
[update-engine] use cancel_safe_futures::coop_cancel (#3795)
An `AbortHandle` turns into a pretty thin wrapper around `coop_cancel::Canceler`.
1 parent 78bcbc1 commit 2b43e89

File tree

5 files changed

+57
-61
lines changed

5 files changed

+57
-61
lines changed

Cargo.lock

Lines changed: 4 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ bytes = "1.4.0"
146146
bytesize = "1.2.0"
147147
camino = "1.1"
148148
camino-tempfile = "1.0.2"
149-
cancel-safe-futures = "0.1.1"
149+
cancel-safe-futures = "0.1.2"
150150
chacha20poly1305 = "0.10.1"
151151
ciborium = "0.2.1"
152152
cfg-if = "1.0"

update-engine/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ license = "MPL-2.0"
66

77
[dependencies]
88
anyhow.workspace = true
9+
cancel-safe-futures.workspace = true
910
debug-ignore.workspace = true
1011
derive-where.workspace = true
1112
either.workspace = true

update-engine/src/engine.rs

Lines changed: 40 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ use std::{
1313
atomic::{AtomicUsize, Ordering},
1414
Mutex,
1515
},
16-
task::{ready, Poll},
16+
task::Poll,
1717
};
1818

19+
use cancel_safe_futures::coop_cancel;
1920
use debug_ignore::DebugIgnore;
2021
use derive_where::derive_where;
2122
use futures::{future::BoxFuture, prelude::*};
@@ -74,8 +75,8 @@ pub struct UpdateEngine<'a, S: StepSpec> {
7475
sender: mpsc::Sender<Event<S>>,
7576

7677
// This is set to None in Self::execute.
77-
abort_sender: Option<mpsc::UnboundedSender<AbortMessage>>,
78-
abort_receiver: mpsc::UnboundedReceiver<AbortMessage>,
78+
canceler: Option<coop_cancel::Canceler<String>>,
79+
cancel_receiver: coop_cancel::Receiver<String>,
7980

8081
// This is a mutex to allow borrows to steps to be held by both
8182
// ComponentRegistrar and NewStep at the same time. (This could also be a
@@ -90,16 +91,16 @@ impl<'a, S: StepSpec + 'a> UpdateEngine<'a, S> {
9091
/// Creates a new `UpdateEngine`.
9192
pub fn new(log: &slog::Logger, sender: mpsc::Sender<Event<S>>) -> Self {
9293
let execution_id = ExecutionId(Uuid::new_v4());
93-
let (abort_sender, abort_receiver) = mpsc::unbounded_channel();
94+
let (canceler, cancel_receiver) = coop_cancel::new_pair();
9495
Self {
9596
log: log.new(slog::o!(
9697
"component" => "UpdateEngine",
9798
"execution_id" => format!("{execution_id}"),
9899
)),
99100
execution_id: ExecutionId(Uuid::new_v4()),
100101
sender,
101-
abort_sender: Some(abort_sender),
102-
abort_receiver,
102+
canceler: Some(canceler),
103+
cancel_receiver,
103104
steps: Default::default(),
104105
}
105106
}
@@ -156,8 +157,8 @@ impl<'a, S: StepSpec + 'a> UpdateEngine<'a, S> {
156157
/// An abort handle can be used to forcibly cancel update engine executions.
157158
pub fn abort_handle(&self) -> AbortHandle {
158159
AbortHandle {
159-
abort_sender: self
160-
.abort_sender
160+
canceler: self
161+
.canceler
161162
.as_ref()
162163
.expect("abort_sender should always be present")
163164
.clone(),
@@ -169,11 +170,11 @@ impl<'a, S: StepSpec + 'a> UpdateEngine<'a, S> {
169170
/// This returns an `ExecutionHandle`, which needs to be awaited on to drive
170171
/// the engine forward.
171172
pub fn execute(mut self) -> ExecutionHandle<'a, S> {
172-
let abort_sender = self
173-
.abort_sender
173+
let canceler = self
174+
.canceler
174175
.take()
175176
.expect("execute is the only function which does this");
176-
let abort_handle = AbortHandle { abort_sender };
177+
let abort_handle = AbortHandle { canceler };
177178

178179
let engine_fut = self.execute_impl().boxed();
179180

@@ -278,7 +279,7 @@ impl<'a, S: StepSpec + 'a> UpdateEngine<'a, S> {
278279

279280
let (mut step_res, mut reporter) = first_step
280281
.exec
281-
.execute(&self.log, step_exec_cx, &mut self.abort_receiver)
282+
.execute(&self.log, step_exec_cx, &mut self.cancel_receiver)
282283
.await?;
283284

284285
// Now run all remaining steps.
@@ -299,7 +300,7 @@ impl<'a, S: StepSpec + 'a> UpdateEngine<'a, S> {
299300

300301
(step_res, reporter) = step
301302
.exec
302-
.execute(&self.log, step_exec_cx, &mut self.abort_receiver)
303+
.execute(&self.log, step_exec_cx, &mut self.cancel_receiver)
303304
.await?;
304305
}
305306

@@ -328,10 +329,15 @@ impl<'a, S: StepSpec> ExecutionHandle<'a, S> {
328329
/// This sends the message immediately, and returns a future that can be
329330
/// optionally waited against to block until the abort is processed.
330331
///
331-
/// If this engine is still running, it is immediately aborted. The engine
332-
/// sends an `ExecutionAborted` message over the wire, and an
332+
/// If this engine is still running, it is aborted at the next await point.
333+
/// The engine sends an `ExecutionAborted` message over the wire, and an
333334
/// `ExecutionError::Aborted` is returned.
334-
pub fn abort(&self, message: impl Into<String>) -> AbortWaiter {
335+
///
336+
/// Returns `Err(message)` if the engine has already completed execution.
337+
pub fn abort(
338+
&self,
339+
message: impl Into<String>,
340+
) -> Result<AbortWaiter, String> {
335341
self.abort_handle.abort(message.into())
336342
}
337343

@@ -357,9 +363,7 @@ impl<'a, S: StepSpec> Future for ExecutionHandle<'a, S> {
357363
/// An abort handle, used to forcibly cancel update engine executions.
358364
#[derive(Clone, Debug)]
359365
pub struct AbortHandle {
360-
// This is an unbounded sender to make Self::abort not async. In general we
361-
// don't expect more than one message to ever be sent on this channel.
362-
abort_sender: mpsc::UnboundedSender<AbortMessage>,
366+
canceler: coop_cancel::Canceler<String>,
363367
}
364368

365369
impl AbortHandle {
@@ -368,15 +372,17 @@ impl AbortHandle {
368372
/// This sends the message immediately, and returns a future that can be
369373
/// optionally waited against to block until the abort is processed.
370374
///
371-
/// If this engine is still running, it is immediately aborted. The engine
372-
/// sends an `ExecutionAborted` message over the wire, and an
375+
/// If this engine is still running, it is aborted at the next await point.
376+
/// The engine sends an `ExecutionAborted` message over the wire, and an
373377
/// `ExecutionError::Aborted` is returned.
374-
pub fn abort(&self, message: impl Into<String>) -> AbortWaiter {
375-
// Ignore errors here because if the receiver is closed, the engine has
376-
// completed (or failed) execution.
377-
let (message, processed_receiver) = AbortMessage::new(message.into());
378-
_ = self.abort_sender.send(message);
379-
AbortWaiter { processed_receiver }
378+
///
379+
/// Returns `Err(message)` if the engine has already completed execution.
380+
pub fn abort(
381+
&self,
382+
message: impl Into<String>,
383+
) -> Result<AbortWaiter, String> {
384+
let waiter = self.canceler.cancel(message.into())?;
385+
Ok(AbortWaiter { waiter })
380386
}
381387
}
382388

@@ -386,7 +392,7 @@ impl AbortHandle {
386392
/// Dropping this future does not cancel the abort.
387393
#[derive(Debug)]
388394
pub struct AbortWaiter {
389-
processed_receiver: oneshot::Receiver<()>,
395+
waiter: coop_cancel::Waiter<String>,
390396
}
391397

392398
impl Future for AbortWaiter {
@@ -396,24 +402,7 @@ impl Future for AbortWaiter {
396402
mut self: Pin<&mut Self>,
397403
cx: &mut std::task::Context<'_>,
398404
) -> Poll<Self::Output> {
399-
// The return value of the receiver doesn't matter. If it's an error, it
400-
// means that the sender was dropped, which means that execution
401-
// finished.
402-
_ = ready!(self.as_mut().processed_receiver.poll_unpin(cx));
403-
Poll::Ready(())
404-
}
405-
}
406-
407-
#[derive(Debug)]
408-
struct AbortMessage {
409-
message: String,
410-
processed_sender: oneshot::Sender<()>,
411-
}
412-
413-
impl AbortMessage {
414-
fn new(message: String) -> (Self, oneshot::Receiver<()>) {
415-
let (processed_sender, processed_receiver) = oneshot::channel();
416-
(Self { message, processed_sender }, processed_receiver)
405+
self.waiter.poll_unpin(cx)
417406
}
418407
}
419408

@@ -786,7 +775,7 @@ impl<'a, S: StepSpec> StepExec<'a, S> {
786775
self,
787776
log: &slog::Logger,
788777
step_exec_cx: StepExecutionContext<S, F>,
789-
abort_receiver: &mut mpsc::UnboundedReceiver<AbortMessage>,
778+
cancel_receiver: &mut coop_cancel::Receiver<String>,
790779
) -> Result<
791780
(Result<StepOutcome<S>, S::Error>, StepProgressReporter<S, F>),
792781
ExecutionError<S>,
@@ -859,7 +848,7 @@ impl<'a, S: StepSpec> StepExec<'a, S> {
859848
}
860849
}
861850

862-
Some(message) = abort_receiver.recv() => {
851+
Some(message) = cancel_receiver.recv() => {
863852
return Err(reporter.handle_abort(message).await);
864853
}
865854
}
@@ -1080,7 +1069,7 @@ impl<S: StepSpec, F: Fn() -> usize> StepProgressReporter<S, F> {
10801069
}
10811070
}
10821071

1083-
async fn handle_abort(self, message: AbortMessage) -> ExecutionError<S> {
1072+
async fn handle_abort(self, message: String) -> ExecutionError<S> {
10841073
// Send the abort message over the channel.
10851074
//
10861075
// The only way this can fail is if the event receiver is closed or
@@ -1098,21 +1087,17 @@ impl<S: StepSpec, F: Fn() -> usize> StepProgressReporter<S, F> {
10981087
attempt: self.attempt,
10991088
step_elapsed: self.step_start.elapsed(),
11001089
attempt_elapsed: self.attempt_start.elapsed(),
1101-
message: message.message.clone(),
1090+
message: message.clone(),
11021091
},
11031092
}))
11041093
.await;
11051094

1106-
// An error here doesn't matter -- it just means that the abort handle
1107-
// was dropped.
1108-
_ = message.processed_sender.send(());
1109-
11101095
match res {
11111096
Ok(()) => ExecutionError::Aborted {
11121097
component: self.step_info.info.component.clone(),
11131098
id: self.step_info.info.id.clone(),
11141099
description: self.step_info.info.description.clone(),
1115-
message: message.message,
1100+
message: message,
11161101
},
11171102
Err(error) => error.into(),
11181103
}

wicketd/src/update_tracker.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -462,9 +462,17 @@ impl UpdateTrackerData {
462462
return Err(AbortUpdateError::UpdateFinished);
463463
}
464464

465-
let waiter = update_data.abort_handle.abort(message);
466-
waiter.await;
467-
Ok(())
465+
match update_data.abort_handle.abort(message) {
466+
Ok(waiter) => {
467+
waiter.await;
468+
Ok(())
469+
}
470+
Err(_) => {
471+
// This occurs if the engine has finished execution and has been
472+
// dropped.
473+
Err(AbortUpdateError::UpdateFinished)
474+
}
475+
}
468476
}
469477

470478
fn put_repository(&mut self, bytes: BufList) -> Result<(), HttpError> {

0 commit comments

Comments
 (0)