From 4bf0dd80c40ddaf99aa38b0ca95e664dc30c1eae Mon Sep 17 00:00:00 2001 From: Rain Date: Fri, 28 Jul 2023 20:17:04 -0700 Subject: [PATCH] =?UTF-8?q?[=F0=9D=98=80=F0=9D=97=BD=F0=9D=97=BF]=20change?= =?UTF-8?q?s=20to=20main=20this=20commit=20is=20based=20on?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Created using spr 1.3.4 [skip ci] --- Cargo.lock | 6 ++- Cargo.toml | 2 +- update-engine/Cargo.toml | 1 + update-engine/src/engine.rs | 95 +++++++++++++++-------------------- wicketd/src/update_tracker.rs | 14 ++++-- 5 files changed, 57 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4db8742a094..9b42170d728 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -790,14 +790,15 @@ dependencies = [ [[package]] name = "cancel-safe-futures" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20e5267a71ddf9a29ae221073d32f42ee6665be1a61134ff47e7275a250b8d22" +checksum = "84f4df1b54bc954b71be3baaa0771d7fbfebc0b291d2875892f49287f3b5c73d" dependencies = [ "futures-core", "futures-sink", "futures-util", "pin-project-lite", + "tokio", ] [[package]] @@ -8915,6 +8916,7 @@ dependencies = [ "bytes", "camino", "camino-tempfile", + "cancel-safe-futures", "debug-ignore", "derive-where", "either", diff --git a/Cargo.toml b/Cargo.toml index b8e00006e89..b9d710ef114 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -146,7 +146,7 @@ bytes = "1.4.0" bytesize = "1.2.0" camino = "1.1" camino-tempfile = "1.0.2" -cancel-safe-futures = "0.1.1" +cancel-safe-futures = "0.1.2" chacha20poly1305 = "0.10.1" ciborium = "0.2.1" cfg-if = "1.0" diff --git a/update-engine/Cargo.toml b/update-engine/Cargo.toml index 19ce66f81bb..4c2841cf0ff 100644 --- a/update-engine/Cargo.toml +++ b/update-engine/Cargo.toml @@ -6,6 +6,7 @@ license = "MPL-2.0" [dependencies] anyhow.workspace = true +cancel-safe-futures.workspace = true debug-ignore.workspace = true derive-where.workspace = true either.workspace = true diff --git a/update-engine/src/engine.rs b/update-engine/src/engine.rs index e5d4748affa..bd947a1cb10 100644 --- a/update-engine/src/engine.rs +++ b/update-engine/src/engine.rs @@ -13,9 +13,10 @@ use std::{ atomic::{AtomicUsize, Ordering}, Mutex, }, - task::{ready, Poll}, + task::Poll, }; +use cancel_safe_futures::coop_cancel; use debug_ignore::DebugIgnore; use derive_where::derive_where; use futures::{future::BoxFuture, prelude::*}; @@ -74,8 +75,8 @@ pub struct UpdateEngine<'a, S: StepSpec> { sender: mpsc::Sender>, // This is set to None in Self::execute. - abort_sender: Option>, - abort_receiver: mpsc::UnboundedReceiver, + canceler: Option>, + cancel_receiver: coop_cancel::Receiver, // This is a mutex to allow borrows to steps to be held by both // ComponentRegistrar and NewStep at the same time. (This could also be a @@ -90,7 +91,7 @@ impl<'a, S: StepSpec + 'a> UpdateEngine<'a, S> { /// Creates a new `UpdateEngine`. pub fn new(log: &slog::Logger, sender: mpsc::Sender>) -> Self { let execution_id = ExecutionId(Uuid::new_v4()); - let (abort_sender, abort_receiver) = mpsc::unbounded_channel(); + let (canceler, cancel_receiver) = coop_cancel::new_pair(); Self { log: log.new(slog::o!( "component" => "UpdateEngine", @@ -98,8 +99,8 @@ impl<'a, S: StepSpec + 'a> UpdateEngine<'a, S> { )), execution_id: ExecutionId(Uuid::new_v4()), sender, - abort_sender: Some(abort_sender), - abort_receiver, + canceler: Some(canceler), + cancel_receiver, steps: Default::default(), } } @@ -156,8 +157,8 @@ impl<'a, S: StepSpec + 'a> UpdateEngine<'a, S> { /// An abort handle can be used to forcibly cancel update engine executions. pub fn abort_handle(&self) -> AbortHandle { AbortHandle { - abort_sender: self - .abort_sender + canceler: self + .canceler .as_ref() .expect("abort_sender should always be present") .clone(), @@ -169,11 +170,11 @@ impl<'a, S: StepSpec + 'a> UpdateEngine<'a, S> { /// This returns an `ExecutionHandle`, which needs to be awaited on to drive /// the engine forward. pub fn execute(mut self) -> ExecutionHandle<'a, S> { - let abort_sender = self - .abort_sender + let canceler = self + .canceler .take() .expect("execute is the only function which does this"); - let abort_handle = AbortHandle { abort_sender }; + let abort_handle = AbortHandle { canceler }; let engine_fut = self.execute_impl().boxed(); @@ -278,7 +279,7 @@ impl<'a, S: StepSpec + 'a> UpdateEngine<'a, S> { let (mut step_res, mut reporter) = first_step .exec - .execute(&self.log, step_exec_cx, &mut self.abort_receiver) + .execute(&self.log, step_exec_cx, &mut self.cancel_receiver) .await?; // Now run all remaining steps. @@ -299,7 +300,7 @@ impl<'a, S: StepSpec + 'a> UpdateEngine<'a, S> { (step_res, reporter) = step .exec - .execute(&self.log, step_exec_cx, &mut self.abort_receiver) + .execute(&self.log, step_exec_cx, &mut self.cancel_receiver) .await?; } @@ -328,10 +329,15 @@ impl<'a, S: StepSpec> ExecutionHandle<'a, S> { /// This sends the message immediately, and returns a future that can be /// optionally waited against to block until the abort is processed. /// - /// If this engine is still running, it is immediately aborted. The engine - /// sends an `ExecutionAborted` message over the wire, and an + /// If this engine is still running, it is aborted at the next await point. + /// The engine sends an `ExecutionAborted` message over the wire, and an /// `ExecutionError::Aborted` is returned. - pub fn abort(&self, message: impl Into) -> AbortWaiter { + /// + /// Returns `Err(message)` if the engine has already completed execution. + pub fn abort( + &self, + message: impl Into, + ) -> Result { self.abort_handle.abort(message.into()) } @@ -357,9 +363,7 @@ impl<'a, S: StepSpec> Future for ExecutionHandle<'a, S> { /// An abort handle, used to forcibly cancel update engine executions. #[derive(Clone, Debug)] pub struct AbortHandle { - // This is an unbounded sender to make Self::abort not async. In general we - // don't expect more than one message to ever be sent on this channel. - abort_sender: mpsc::UnboundedSender, + canceler: coop_cancel::Canceler, } impl AbortHandle { @@ -368,15 +372,17 @@ impl AbortHandle { /// This sends the message immediately, and returns a future that can be /// optionally waited against to block until the abort is processed. /// - /// If this engine is still running, it is immediately aborted. The engine - /// sends an `ExecutionAborted` message over the wire, and an + /// If this engine is still running, it is aborted at the next await point. + /// The engine sends an `ExecutionAborted` message over the wire, and an /// `ExecutionError::Aborted` is returned. - pub fn abort(&self, message: impl Into) -> AbortWaiter { - // Ignore errors here because if the receiver is closed, the engine has - // completed (or failed) execution. - let (message, processed_receiver) = AbortMessage::new(message.into()); - _ = self.abort_sender.send(message); - AbortWaiter { processed_receiver } + /// + /// Returns `Err(message)` if the engine has already completed execution. + pub fn abort( + &self, + message: impl Into, + ) -> Result { + let waiter = self.canceler.cancel(message.into())?; + Ok(AbortWaiter { waiter }) } } @@ -386,7 +392,7 @@ impl AbortHandle { /// Dropping this future does not cancel the abort. #[derive(Debug)] pub struct AbortWaiter { - processed_receiver: oneshot::Receiver<()>, + waiter: coop_cancel::Waiter, } impl Future for AbortWaiter { @@ -396,24 +402,7 @@ impl Future for AbortWaiter { mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll { - // The return value of the receiver doesn't matter. If it's an error, it - // means that the sender was dropped, which means that execution - // finished. - _ = ready!(self.as_mut().processed_receiver.poll_unpin(cx)); - Poll::Ready(()) - } -} - -#[derive(Debug)] -struct AbortMessage { - message: String, - processed_sender: oneshot::Sender<()>, -} - -impl AbortMessage { - fn new(message: String) -> (Self, oneshot::Receiver<()>) { - let (processed_sender, processed_receiver) = oneshot::channel(); - (Self { message, processed_sender }, processed_receiver) + self.waiter.poll_unpin(cx) } } @@ -786,7 +775,7 @@ impl<'a, S: StepSpec> StepExec<'a, S> { self, log: &slog::Logger, step_exec_cx: StepExecutionContext, - abort_receiver: &mut mpsc::UnboundedReceiver, + cancel_receiver: &mut coop_cancel::Receiver, ) -> Result< (Result, S::Error>, StepProgressReporter), ExecutionError, @@ -859,7 +848,7 @@ impl<'a, S: StepSpec> StepExec<'a, S> { } } - Some(message) = abort_receiver.recv() => { + Some(message) = cancel_receiver.recv() => { return Err(reporter.handle_abort(message).await); } } @@ -1080,7 +1069,7 @@ impl usize> StepProgressReporter { } } - async fn handle_abort(self, message: AbortMessage) -> ExecutionError { + async fn handle_abort(self, message: String) -> ExecutionError { // Send the abort message over the channel. // // The only way this can fail is if the event receiver is closed or @@ -1098,21 +1087,17 @@ impl usize> StepProgressReporter { attempt: self.attempt, step_elapsed: self.step_start.elapsed(), attempt_elapsed: self.attempt_start.elapsed(), - message: message.message.clone(), + message: message.clone(), }, })) .await; - // An error here doesn't matter -- it just means that the abort handle - // was dropped. - _ = message.processed_sender.send(()); - match res { Ok(()) => ExecutionError::Aborted { component: self.step_info.info.component.clone(), id: self.step_info.info.id.clone(), description: self.step_info.info.description.clone(), - message: message.message, + message: message, }, Err(error) => error.into(), } diff --git a/wicketd/src/update_tracker.rs b/wicketd/src/update_tracker.rs index 45540f45af9..1e0b3c03ed2 100644 --- a/wicketd/src/update_tracker.rs +++ b/wicketd/src/update_tracker.rs @@ -462,9 +462,17 @@ impl UpdateTrackerData { return Err(AbortUpdateError::UpdateFinished); } - let waiter = update_data.abort_handle.abort(message); - waiter.await; - Ok(()) + match update_data.abort_handle.abort(message) { + Ok(waiter) => { + waiter.await; + Ok(()) + } + Err(_) => { + // This occurs if the engine has finished execution and has been + // dropped. + Err(AbortUpdateError::UpdateFinished) + } + } } fn put_repository(&mut self, bytes: BufList) -> Result<(), HttpError> {