From 980ca70b0bb84cdbbe1d72508f03cf60cedd0c3d Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Mon, 21 Apr 2025 11:40:21 -0400 Subject: [PATCH 1/4] Introduce a `DatabaseLifecycleTracker`, which tracks database lifecycles Perhaps it should be called `DatabaseLifecycleManager`? This new object is responsible for tracking the lifecycle of a database, and for cleaning up after the database exits. In particular, it: - Unregisters the `Host` from the containing `HostController`. This was previously handled by an ad-hoc on-panic callback closure. - Aborts the database memory usage metrics reporter task. This was previously handled by a `Drop` method on `Host`. - Disconnects all connected WebSocket clients. Previously, this didn't happen at all, as per issue 2630. I've also added some commentary to the WebSocket actor loop. Follow-up commits will add tests once I've consulted with the team about how best to test this change. --- crates/client-api/src/routes/subscribe.rs | 58 ++++- crates/core/src/host/host_controller.rs | 287 +++++++++++++++++----- crates/core/src/host/module_host.rs | 54 ++-- 3 files changed, 326 insertions(+), 73 deletions(-) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 51d51120ec0..18351dcad68 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -226,6 +226,8 @@ async fn ws_client_actor_inner( let mut closed = false; let mut rx_buf = Vec::new(); + let mut connected_clients_watcher = client.module.lifecycle.lock().connected_clients_watcher.clone(); + let addr = client.module.info().database_identity; loop { @@ -256,13 +258,16 @@ async fn ws_client_actor_inner( // If we've received an incoming message, // grab it to handle in the next `match`. - message = ws.next() => match message { + incoming_ws_message = ws.next() => match incoming_ws_message { Some(Ok(m)) => Item::Message(ClientMessage::from_message(m)), Some(Err(error)) => { log::warn!("Websocket receive error: {}", error); + + // TODO: Should we error and drop the connection here? continue; } - // the client sent us a close frame + // the client sent us a close frame, or we closed the connection ourselves. + // Calls to `ws.close()` will eventually wind up here. None => break, }, @@ -305,6 +310,7 @@ async fn ws_client_actor_inner( let send_all = also_poll(send_all, make_progress(&mut current_message)); let t1 = Instant::now(); if let Err(error) = send_all.await { + // TODO: Should this drop the connection? log::warn!("Websocket send error: {error}") } let time = t1.elapsed(); @@ -335,6 +341,50 @@ async fn ws_client_actor_inner( continue; } + notif = connected_clients_watcher.changed() => { + match notif { + // The `DatabaseLifecycleTracker` for this database got dropped + // without first notifying clients to disconnect. + Err(_) => { + log::error!("`connected_clients_watcher` sender dropped without shutdown notification"); + + // Send a close frame while continuing to poll the `handle_queue`, + // to avoid deadlocks or delays due to enqueued futures holding resources. + let close = also_poll( + ws.close(Some(CloseFrame { code: CloseCode::Error, reason: "module exited without notification".into() })), + make_progress(&mut current_message), + ); + if let Err(e) = close.await { + log::warn!("error closing: {e:#}") + } + closed = true; + } + + Ok(()) => { + // Nasty dance to avoid holding a `tokio::sync::watch::Ref` across an await point: + // Grab the ref, clone the `String` error reason, then drop the ref as soon as possible. + let reason = if let Some(reason) = connected_clients_watcher.borrow_and_update().as_ref() { + reason.clone() + } else { + // Spurious wakeup; odd but not problematic. + continue; + }; + + // Send a close frame while continuing to poll the `handle_queue`, + // to avoid deadlocks or delays due to enqueued futures holding resources. + let close = also_poll( + ws.close(Some(CloseFrame { code: CloseCode::Error, reason: reason.into(), })), + make_progress(&mut current_message), + ); + if let Err(e) = close.await { + log::warn!("error closing: {e:#}") + } + closed = true; + } + } + continue; + } + // If it's time to send a ping... _ = liveness_check_interval.tick() => { // If we received a pong at some point, send a fresh ping. @@ -342,6 +392,7 @@ async fn ws_client_actor_inner( // Send a ping message while continuing to poll the `handle_queue`, // to avoid deadlocks or delays due to enqueued futures holding resources. if let Err(e) = also_poll(ws.send(WsMessage::Ping(Bytes::new())), make_progress(&mut current_message)).await { + // Should this drop the connection? log::warn!("error sending ping: {e:#}"); } continue; @@ -366,15 +417,18 @@ async fn ws_client_actor_inner( } Item::HandleResult(res) => { if let Err(e) = res { + // If it's an execution error, broadcast, then continue the loop. if let MessageHandleError::Execution(err) = e { log::error!("{err:#}"); let msg = serialize(err, client.config); if let Err(error) = ws.send(datamsg_to_wsmsg(msg)).await { + // Should we drop the connection? log::warn!("Websocket send error: {error}") } continue; } log::debug!("Client caused error on text message: {}", e); + // If it's any other kind of error, drop the connection. if let Err(e) = ws .close(Some(CloseFrame { code: CloseCode::Error, diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index de65aba2c61..b04eef80852 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -17,7 +17,7 @@ use crate::util::spawn_rayon; use anyhow::{anyhow, ensure, Context}; use async_trait::async_trait; use durability::{Durability, EmptyHistory}; -use log::{info, trace, warn}; +use log::{info, trace}; use parking_lot::{Mutex, RwLock}; use spacetimedb_data_structures::map::IntMap; use spacetimedb_durability::{self as durability, TxOffset}; @@ -27,11 +27,11 @@ use spacetimedb_paths::FromPathUnchecked; use spacetimedb_sats::hash::Hash; use std::future::Future; use std::ops::Deref; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; use tempfile::TempDir; use tokio::sync::{watch, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock as AsyncRwLock}; -use tokio::task::AbortHandle; +use tokio::task::JoinHandle; // TODO: // @@ -40,8 +40,12 @@ use tokio::task::AbortHandle; /// A shared mutable cell containing a module host and associated database. type HostCell = Arc>>; +type HostsInner = Mutex>; + /// The registry of all running hosts. -type Hosts = Arc>>; +type Hosts = Arc; + +type WeakHosts = Weak; pub type ExternalDurability = (Arc>, DiskSizeFn); @@ -276,15 +280,23 @@ impl HostController { { trace!("using database {}/{}", database.database_identity, replica_id); let module = self.get_or_launch_module_host(database, replica_id).await?; - let on_panic = self.unregister_fn(replica_id); - let result = tokio::task::spawn_blocking(move || f(&module.replica_ctx().relational_db)) - .await - .unwrap_or_else(|e| { - warn!("database operation panicked"); - on_panic(); - std::panic::resume_unwind(e.into_panic()) - }); - Ok(result) + let lifecycle = Arc::clone(&module.lifecycle); + + let result = tokio::task::spawn_blocking(move || f(&module.replica_ctx().relational_db)).await; + + match result { + Ok(result) => Ok(result), + Err(e) => { + // We never cancel this task + // (it's pretty clearly `spawn`ed and then immediately `await`ed right above), + // so no need to check `e.is_panic` or `e.is_cancelled`. + let panic_payload = e.into_panic(); + let err = DatabaseLifecycleTracker::panic_payload_to_error("while `using_database`", &panic_payload); + log::error!("{err:?}"); + lifecycle.lock().stop_database(err).await; + std::panic::resume_unwind(panic_payload) + } + } } /// Update the [`ModuleHost`] identified by `replica_id` to the given @@ -327,13 +339,7 @@ impl HostController { } }; let update_result = host - .update_module( - self.runtimes.clone(), - host_type, - program, - self.energy_monitor.clone(), - self.unregister_fn(replica_id), - ) + .update_module(self.runtimes.clone(), host_type, program, self.energy_monitor.clone()) .await?; *guard = Some(host); @@ -404,13 +410,7 @@ impl HostController { ); let program = load_program(&self.program_storage, program_hash).await?; let update_result = host - .update_module( - self.runtimes.clone(), - host_type, - program, - self.energy_monitor.clone(), - self.unregister_fn(replica_id), - ) + .update_module(self.runtimes.clone(), host_type, program, self.energy_monitor.clone()) .await?; match update_result { UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => { @@ -439,6 +439,11 @@ impl HostController { let module = host.module.borrow().clone(); module.exit().await; let table_names = module.info().module_def.tables().map(|t| t.name.deref()); + // FIXME: We need to drop `module` before this, + // or at least the metrics abort handle it contains. + // Currently there's a race between this and the metrics reporter job. + // + // Should we be calling this in `DatabaseLifecycleTracker::stop_database` instead of here? db_metrics::data_size::remove_database_gauges(&module.info().database_identity, table_names); } } @@ -485,13 +490,9 @@ impl HostController { /// On-panic callback passed to [`ModuleHost`]s created by this controller. /// /// Removes the module with the given `replica_id` from this controller. - fn unregister_fn(&self, replica_id: u64) -> impl Fn() + Send + Sync + 'static { + fn unregister_handle(&self, replica_id: u64) -> UnregisterHandle { let hosts = Arc::downgrade(&self.hosts); - move || { - if let Some(hosts) = hosts.upgrade() { - hosts.lock().remove(&replica_id); - } - } + UnregisterHandle { hosts, replica_id } } async fn acquire_write_lock(&self, replica_id: u64) -> OwnedRwLockWriteGuard> { @@ -509,6 +510,35 @@ impl HostController { } } +/// Handle to unregister a [`Host`] from its containing [`HostController`]. +/// +/// Held by the [`DatabaseLifecycleTracker`]. +struct UnregisterHandle { + hosts: WeakHosts, + replica_id: u64, +} + +impl UnregisterHandle { + /// Unregister this handle's [`Host`] from its containing [`HostController`]. + fn unregister(&self) { + if let Some(hosts) = self.hosts.upgrade() { + hosts.lock().remove(&self.replica_id); + } + } + + /// Make a handle that doesn't reference any [`HostController`]. + /// + /// Calling [`Self::unregister`] on this handle will not do anything. + /// + /// Used by [`HostController::check_module_validity`]. + fn phony() -> Self { + Self { + hosts: std::sync::Weak::new(), + replica_id: 0, + } + } +} + fn stored_program_hash(db: &RelationalDB) -> anyhow::Result> { let meta = db.metadata()?; Ok(meta.map(|meta| meta.program_hash)) @@ -560,7 +590,7 @@ async fn make_module_host( scheduler: Scheduler, program: Program, energy_monitor: Arc, - unregister: impl Fn() + Send + Sync + 'static, + lifecycle: DatabaseLifecycleTrackerHandle, ) -> anyhow::Result<(Program, ModuleHost)> { spawn_rayon(move || { let module_host = match host_type { @@ -574,7 +604,7 @@ async fn make_module_host( let start = Instant::now(); let actor = runtimes.wasmtime.make_actor(mcc)?; trace!("wasmtime::make_actor blocked for {:?}", start.elapsed()); - ModuleHost::new(actor, unregister) + ModuleHost::new(actor, lifecycle) } }; Ok((program, module_host)) @@ -602,7 +632,7 @@ async fn launch_module( database: Database, replica_id: u64, program: Program, - on_panic: impl Fn() + Send + Sync + 'static, + lifecycle: DatabaseLifecycleTrackerHandle, relational_db: Arc, energy_monitor: Arc, replica_dir: ReplicaDir, @@ -622,7 +652,7 @@ async fn launch_module( scheduler.clone(), program, energy_monitor.clone(), - on_panic, + lifecycle, ) .await?; @@ -671,6 +701,155 @@ async fn update_module( } } +#[derive(Debug)] +pub enum DatabaseLifecycle { + Starting, + Running, + Stopped { reason: anyhow::Error }, +} + +pub type DatabaseLifecycleTrackerHandle = Arc>; + +pub struct DatabaseLifecycleTracker { + lifecycle: DatabaseLifecycle, + /// Sender half of a [`watch::channel`] which notifies all client connections that they should disconnect. + /// + /// When this database is deleted, paused, crashes, or otherwise stops running, + /// this will be set to `Some(err)`, + /// where `err` is the printed representation of the reason in the [`DatabaseLifecycle::Stopped`]. + /// + /// Client connection actors (see `spacetimedb_client_api::routes::subscribe`) + /// will poll the [`watch::Receiver`] side of this channel, and when it is set to `Some(err)`, + /// they will close their WebSocket connection with `err` as the message. + /// + /// This channel should not be used for anything other than disconnecting WebSocket clients. + /// Any other actions which need to be taken when the database changes lifecycle states + /// should use separate channels (or [`AbortHandles`], or some other mechanism). + /// This makes it much easier to reason about + /// what tasks are cancelled and what reseources are disposed when + /// and in what order. + connected_clients_disconnect: watch::Sender>, + + /// Receiver half of the `connected_clients_disconnect` channel. + /// + /// Each WebSocket client will hold a clone of this channel, and will poll it in their loop. + /// When this is set to `Some(err)`, they will close their WebSocket connection + /// with `err` as the message. + pub connected_clients_watcher: watch::Receiver>, + + /// Handle to the metrics collection task started via [`disk_monitor`]. + /// + /// The task collects metrics from the `replica_ctx`, and so stays alive as long + /// as the `replica_ctx` is live. + /// + /// This will be aborted when the database transitions to [`DatabaseLifecycle::Stopped`]. + metrics_task: Option>, + + /// [`HostController::unregister_handle`] to unregister this database. + unregister_handle: UnregisterHandle, +} + +impl Drop for DatabaseLifecycleTracker { + fn drop(&mut self) { + if !self.is_stopped() { + log::warn!("Dropping DatabaseLifecycleTracker whose state is not Stopped"); + // TODO: should we attempt to clean up here? Or possibly should we treat this as a hard error? + } + } +} + +impl DatabaseLifecycleTracker { + pub fn is_stopped(&self) -> bool { + matches!(self.get_lifecycle(), DatabaseLifecycle::Stopped { .. }) + } + + pub fn get_lifecycle(&self) -> &DatabaseLifecycle { + &self.lifecycle + } + + pub async fn stop_database(&mut self, reason: anyhow::Error) { + if let DatabaseLifecycle::Stopped { reason: old_reason } = &self.lifecycle { + log::warn!("DatabaseLifecycleTracker already stopped with reason {old_reason:?}, but attempted to stop again with reason {reason:?}"); + return; + } + + self.unregister_handle.unregister(); + + match self.metrics_task.take() { + None => log::warn!("metrics_task is None when stopping database with reason {reason:?}"), + Some(metrics_task) => { + metrics_task.abort(); + // It's not sufficient to just tell the metrics task to abort, + // we need to actually wait until it has fully stopped before we return from this method. + // Otherwise, we end up with a race condition between + // the metrics task noticing that it should abort + // and doing `db_metrics::data_size::remove_database_gauges` + // in `HostController::exit_module_host`. + let _ = metrics_task.await; + } + } + + self.connected_clients_disconnect.send(Some(format!("{reason:?}"))).expect("connected_clients_disconnect channel is closed, but we are holding its receiver as connected_clients_watcher!"); + } + + fn new(unregister_handle: UnregisterHandle) -> Self { + let (connected_clients_disconnect, connected_clients_watcher) = watch::channel(None); + Self { + lifecycle: DatabaseLifecycle::Starting, + metrics_task: None, + connected_clients_disconnect, + connected_clients_watcher, + unregister_handle, + } + } + + fn make_running(&mut self, metrics_task: JoinHandle<()>) { + match self.get_lifecycle() { + DatabaseLifecycle::Starting => { + self.metrics_task = Some(metrics_task); + self.lifecycle = DatabaseLifecycle::Running; + } + els => { + panic!("`make_running` on lifecycle manager in state {els:?}") + } + } + } + + /// Create a tracker that doesn't do any clean-up when the module dies. + /// + /// Used by [`HostController::check_module_validity`], + /// which constructs a short-lived in-memory database which requires no cleanup. + fn phony() -> Self { + Self::new(UnregisterHandle::phony()) + } + + /// Convert the result of [`tokio::task::JoinError::into_panic`] into an `anyhow::Error` + /// which can be passed to [`Self::stop_database`]. + /// + /// This must be a separate non-`async` method, rather than part of [`Self::stop_database`] + /// (or a wrapper around it) because `panic_payload` is not `Sync`, + /// and so the reference cannot be live across an `await` point. + /// Reference-typed function arguments are always live for the entire body of the function, + /// so the futures for `async` functions that accept a `panic_payload` argument are `!Send`, + /// even if the `panic_payload` is unused before all `await` points. + pub fn panic_payload_to_error( + context: &str, + panic_payload: &(dyn std::any::Any + Send + 'static), + ) -> anyhow::Error { + // According to the Rust docs + // https://doc.rust-lang.org/stable/std/panic/struct.PanicHookInfo.html#method.payload + // panic payloads resulting from `panic!` always result in a payload + // that is either `&'static str` or `String`. + if let Some(s) = panic_payload.downcast_ref::<&str>() { + anyhow::anyhow!("Host execution panicked {context}: {s}") + } else if let Some(s) = panic_payload.downcast_ref::() { + anyhow::anyhow!("Host execution panicked {context}: {s}") + } else { + anyhow::anyhow!("Host execution panicked {context} with payload of unknown type") + } + } +} + /// Encapsulates a database, associated module, and auxiliary state. struct Host { /// The [`ModuleHost`], providing the callable reducer API. @@ -687,11 +866,8 @@ struct Host { replica_ctx: Arc, /// Scheduler for repeating reducers, operating on the current `module`. scheduler: Scheduler, - /// Handle to the metrics collection task started via [`disk_monitor`]. - /// - /// The task collects metrics from the `replica_ctx`, and so stays alive as long - /// as the `replica_ctx` is live. The task is aborted when [`Host`] is dropped. - metrics_task: AbortHandle, + + lifecycle: Arc>, } impl Host { @@ -711,7 +887,11 @@ impl Host { durability, .. } = host_controller; - let on_panic = host_controller.unregister_fn(replica_id); + + let lifecycle_tracker = Arc::new(Mutex::new(DatabaseLifecycleTracker::new( + host_controller.unregister_handle(replica_id), + ))); + let replica_dir = data_dir.replica(replica_id); let (db, connected_clients) = match config.storage { @@ -756,7 +936,7 @@ impl Host { database, replica_id, program, - on_panic, + Arc::clone(&lifecycle_tracker), Arc::new(db), energy_monitor.clone(), replica_dir, @@ -794,13 +974,15 @@ impl Host { } scheduler_starter.start(&module_host)?; - let metrics_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle(); + let metrics_task = tokio::spawn(metric_reporter(replica_ctx.clone())); + + lifecycle_tracker.lock().make_running(metrics_task); Ok(Host { module: watch::Sender::new(module_host), replica_ctx, scheduler, - metrics_task, + lifecycle: lifecycle_tracker, }) } @@ -838,6 +1020,8 @@ impl Host { None, )?; + let phony_lifecycle_tracker = DatabaseLifecycleTracker::phony(); + let (program, launched) = launch_module( database, 0, @@ -845,7 +1029,7 @@ impl Host { // No need to register a callback here: // proper publishes use it to unregister a panicked module, // but this module is not registered in the first place. - || log::error!("launch_module on_panic called for temporary publish in-memory instance"), + Arc::new(Mutex::new(phony_lifecycle_tracker)), Arc::new(db), Arc::new(NullEnergyMonitor), phony_replica_dir, @@ -879,7 +1063,6 @@ impl Host { host_type: HostType, program: Program, energy_monitor: Arc, - on_panic: impl Fn() + Send + Sync + 'static, ) -> anyhow::Result { let replica_ctx = &self.replica_ctx; let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db.clone()); @@ -891,7 +1074,7 @@ impl Host { scheduler.clone(), program, energy_monitor, - on_panic, + Arc::clone(&self.lifecycle), ) .await?; @@ -917,12 +1100,6 @@ impl Host { } } -impl Drop for Host { - fn drop(&mut self) { - self.metrics_task.abort(); - } -} - const STORAGE_METERING_INTERVAL: Duration = Duration::from_secs(15); /// Periodically collect gauge stats and update prometheus metrics. diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 085048f3ae6..c4336e5b50d 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1,3 +1,4 @@ +use super::host_controller::{DatabaseLifecycleTracker, DatabaseLifecycleTrackerHandle}; use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult, ReducerId, ReducerOutcome, Scheduler}; use crate::client::{ClientActorId, ClientConnectionSender}; use crate::database_logger::{LogLevel, Record}; @@ -326,8 +327,12 @@ impl ModuleInstance for AutoReplacingModuleInstance { pub struct ModuleHost { pub info: Arc, inner: Arc, - /// Called whenever a reducer call on this host panics. - on_panic: Arc, + /// Whenever host execution panics while executing a reducer, + /// we'll signal the [`DatabaseLifecycleTracker`] to shut down this database. + /// + /// Note that this does not apply to in-WASM panics or error returns, + /// only to host panics. + pub lifecycle: DatabaseLifecycleTrackerHandle, } impl fmt::Debug for ModuleHost { @@ -419,7 +424,7 @@ impl DynModuleHost for HostControllerActor { pub struct WeakModuleHost { info: Arc, inner: Weak, - on_panic: Weak, + lifecycle: Weak>, } #[derive(Debug)] @@ -480,7 +485,7 @@ pub enum ClientConnectedError { } impl ModuleHost { - pub fn new(mut module: impl Module, on_panic: impl Fn() + Send + Sync + 'static) -> Self { + pub fn new(mut module: impl Module, lifecycle: DatabaseLifecycleTrackerHandle) -> Self { let info = module.info(); let instance_pool = LendingPool::new(); instance_pool.add_multiple(module.initial_instances()).unwrap(); @@ -488,8 +493,7 @@ impl ModuleHost { module: Arc::new(module), instance_pool, }); - let on_panic = Arc::new(on_panic); - ModuleHost { info, inner, on_panic } + ModuleHost { info, inner, lifecycle } } #[inline] @@ -518,14 +522,32 @@ impl ModuleHost { // Spawning a task allows to catch panics without proving to the // compiler that `dyn ModuleInstance` is unwind safe. - // If a reducer call panics, we **must** ensure to call `self.on_panic` + // If the host panics during a reducer call, we **must** ensure to notify `self.lifecycle` // so that the module is discarded by the host controller. - let result = tokio::spawn(async move { f(&mut *inst) }).await.unwrap_or_else(|e| { - log::warn!("reducer {reducer} panicked"); - (self.on_panic)(); - std::panic::resume_unwind(e.into_panic()); - }); - Ok(result) + // Note that this is not the same as the WASM execution panicking! + // If the WASM code exits with error during [`Self::call_reducer_inner`], + // either by returning `Err`, panicking or throwing an exception, + // then `f` here will return `Err(_)`, *not* panic, and so the `tokio::spawn` `JoinHandle` + // will resolve to `Ok(Err(_))`. + // In that case, we should not stop the database, + // as we can and will handle that error and recover. + let result = tokio::spawn(async move { f(&mut *inst) }).await; + + match result { + Ok(result) => Ok(result), + Err(e) => { + // We never cancel this task + // (it's pretty clearly `spawn`ed and then immediately `await`ed right above), + // so no need to check `e.is_panic` or `e.is_cancelled`. + let panic_payload = e.into_panic(); + let err = DatabaseLifecycleTracker::panic_payload_to_error( + &format!("while executing reducer {reducer}"), + &panic_payload, + ); + self.lifecycle.lock().stop_database(err).await; + std::panic::resume_unwind(panic_payload); + } + } } pub async fn disconnect_client(&self, client_id: ClientActorId) { @@ -994,7 +1016,7 @@ impl ModuleHost { WeakModuleHost { info: self.info.clone(), inner: Arc::downgrade(&self.inner), - on_panic: Arc::downgrade(&self.on_panic), + lifecycle: Arc::downgrade(&self.lifecycle), } } @@ -1010,11 +1032,11 @@ impl ModuleHost { impl WeakModuleHost { pub fn upgrade(&self) -> Option { let inner = self.inner.upgrade()?; - let on_panic = self.on_panic.upgrade()?; + let lifecycle = self.lifecycle.upgrade()?; Some(ModuleHost { info: self.info.clone(), inner, - on_panic, + lifecycle, }) } } From 7a313181b53c25ef2adf13f645d04b41eccab15d Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Wed, 11 Jun 2025 12:05:14 -0400 Subject: [PATCH 2/4] Ignore that lint --- crates/core/src/host/host_controller.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index ccbaf596b8c..72abcec5310 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -317,7 +317,11 @@ impl HostController { let panic_payload = e.into_panic(); let err = DatabaseLifecycleTracker::panic_payload_to_error("while `using_database`", &panic_payload); log::error!("{err:?}"); + + // FIXME: Find a way to avoid this `.await` while still stopping the metrics task. + #[allow(clippy::await_holding_lock)] lifecycle.lock().stop_database(err).await; + std::panic::resume_unwind(panic_payload) } } From f9b79fcef73e3dc41b353114bb57a647a1a78d61 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Wed, 11 Jun 2025 13:24:11 -0400 Subject: [PATCH 3/4] *actually* ignore that lint, which has to be at the function level... --- crates/core/src/host/host_controller.rs | 9 +++++++-- crates/core/src/host/module_host.rs | 12 +++++++++--- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 72abcec5310..c2272a08409 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -302,6 +302,13 @@ impl HostController { F: FnOnce(&RelationalDB) -> T + Send + 'static, T: Send + 'static, { + // For the `stop_database` call when we encounter a panic. + // Frustratingly, it seems this lint can't be ignored on a per-expression basis, + // only at the function level or higher. + // TODO: Rework so that we don't have to ignore this lint, + // possibly using an async lock? + #![allow(clippy::await_holding_lock)] + trace!("using database {}/{}", database.database_identity, replica_id); let module = self.get_or_launch_module_host(database, replica_id).await?; let lifecycle = Arc::clone(&module.lifecycle); @@ -318,8 +325,6 @@ impl HostController { let err = DatabaseLifecycleTracker::panic_payload_to_error("while `using_database`", &panic_payload); log::error!("{err:?}"); - // FIXME: Find a way to avoid this `.await` while still stopping the metrics task. - #[allow(clippy::await_holding_lock)] lifecycle.lock().stop_database(err).await; std::panic::resume_unwind(panic_payload) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 0f375b7e5c4..32fc56f95e6 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -562,6 +562,13 @@ impl ModuleHost { F: FnOnce(&mut dyn ModuleInstance) -> R + Send + 'static, R: Send + 'static, { + // For the `stop_database` call when we encounter a panic. + // Frustratingly, it seems this lint can't be ignored on a per-expression basis, + // only at the function level or higher. + // TODO: Rework so that we don't have to ignore this lint, + // possibly using an async lock? + #![allow(clippy::await_holding_lock)] + // Record the time until our function starts running. let queue_timer = WORKER_METRICS .reducer_wait_time @@ -611,14 +618,13 @@ impl ModuleHost { match result { Ok(result) => Ok(result), Err(panic_payload) => { - // We never cancel this task - // (it's pretty clearly `spawn`ed and then immediately `await`ed right above), - // so no need to check `e.is_panic` or `e.is_cancelled`. let err = DatabaseLifecycleTracker::panic_payload_to_error( &format!("while executing reducer {reducer}"), &panic_payload, ); + self.lifecycle.lock().stop_database(err).await; + std::panic::resume_unwind(panic_payload); } } From 24181c26686fa9315c1961d07e8b01dced1f2424 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Thu, 12 Jun 2025 16:51:38 +0200 Subject: [PATCH 4/4] Update tracy A linux build system only exists for newer versions of the tracy application, and that claims the protocol is incompatible with `tracing-tracy` 0.10 --- Cargo.lock | 13 +++++++------ Cargo.toml | 2 +- crates/core/src/startup.rs | 2 +- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c9bd4e05985..47106fc4e2b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6959,9 +6959,9 @@ dependencies = [ [[package]] name = "tracing-tracy" -version = "0.10.5" +version = "0.11.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fca0df0602d3ac1cfd7413bd4f463800fe076bf6b88698722aa763fe1561248b" +checksum = "0eaa1852afa96e0fe9e44caa53dc0bd2d9d05e0f2611ce09f97f8677af56e4ba" dependencies = [ "tracing-core", "tracing-subscriber", @@ -6970,9 +6970,9 @@ dependencies = [ [[package]] name = "tracy-client" -version = "0.16.5" +version = "0.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "307e6b7030112fe9640fdd87988a40795549ba75c355f59485d14e6b444d2987" +checksum = "3927832d93178f979a970d26deed7b03510586e328f31b0f9ad7a73985b8332a" dependencies = [ "loom", "once_cell", @@ -6981,11 +6981,12 @@ dependencies = [ [[package]] name = "tracy-client-sys" -version = "0.22.2" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d104d610dfa9dd154535102cc9c6164ae1fa37842bc2d9e83f9ac82b0ae0882" +checksum = "c032d68a49d25d9012a864fef1c64ac17aee43c87e0477bf7301d8ae8bfea7b7" dependencies = [ "cc", + "windows-targets 0.52.6", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 3159af03756..ea98f7ef168 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -310,7 +310,7 @@ features = [ ] [workspace.dependencies.tracing-tracy] -version = "0.10.4" +version = "0.11.4" # We use the "ondemand" feature to allow connecting after the start, # and reconnecting, from the tracy client to the database. # TODO(George): Need to be able to remove "broadcast" in some build configurations. diff --git a/crates/core/src/startup.rs b/crates/core/src/startup.rs index 0dc94760328..f7c97d221b1 100644 --- a/crates/core/src/startup.rs +++ b/crates/core/src/startup.rs @@ -75,7 +75,7 @@ pub fn configure_tracing(opts: TracingOptions) { let env_filter_layer = conf_to_filter(opts.config); let tracy_layer = if opts.tracy { - Some(tracing_tracy::TracyLayer::new()) + Some(tracing_tracy::TracyLayer::default()) } else { None };