From 3c12fb8b9b5e3f07e4f7630f2667012de6523573 Mon Sep 17 00:00:00 2001 From: Mark Logan <103447440+mystenmark@users.noreply.github.com> Date: Tue, 14 Jan 2025 09:48:57 -0800 Subject: [PATCH] [Cherry Pick] Fix race condition for transaction cache (#20875) Bug was (likely) as follows: Reader thread | Writer thread (state sync) invalidate tickets get ticket miss cache read database insert to cache insert to cache (racing) ticket is valid panic write to database The writer thread must insert to db first so that the db read cannot find an old value while holding a valid ticket --- .../src/execution_cache/cache_types.rs | 11 ++-- .../unit_tests/writeback_cache_tests.rs | 57 +++++++++++++++++++ .../src/execution_cache/writeback_cache.rs | 12 ++-- 3 files changed, 70 insertions(+), 10 deletions(-) diff --git a/crates/sui-core/src/execution_cache/cache_types.rs b/crates/sui-core/src/execution_cache/cache_types.rs index 209a861b54406..13f26947d1c5f 100644 --- a/crates/sui-core/src/execution_cache/cache_types.rs +++ b/crates/sui-core/src/execution_cache/cache_types.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use std::{cmp::Ordering, hash::DefaultHasher}; use moka::sync::Cache as MokaCache; +use mysten_common::debug_fatal; use parking_lot::Mutex; use sui_types::base_types::SequenceNumber; @@ -292,10 +293,12 @@ where let mut entry = entry.value().lock(); check_ticket()?; - // Ticket expiry makes this assert impossible. - // TODO: relax to debug_assert? - assert!(!entry.is_newer_than(&value), "entry is newer than value"); - *entry = value; + // Ticket expiry should make this assert impossible. + if entry.is_newer_than(&value) { + debug_fatal!("entry is newer than value"); + } else { + *entry = value; + } } Ok(()) diff --git a/crates/sui-core/src/execution_cache/unit_tests/writeback_cache_tests.rs b/crates/sui-core/src/execution_cache/unit_tests/writeback_cache_tests.rs index af0b1213e2541..15f71ba9f890e 100644 --- a/crates/sui-core/src/execution_cache/unit_tests/writeback_cache_tests.rs +++ b/crates/sui-core/src/execution_cache/unit_tests/writeback_cache_tests.rs @@ -1339,3 +1339,60 @@ async fn latest_object_cache_race_test() { checker.join().unwrap(); invalidator.join().unwrap(); } + +#[tokio::test] +async fn test_transaction_cache_race() { + telemetry_subscribers::init_for_testing(); + + let mut s = Scenario::new(None, Arc::new(AtomicU32::new(0))).await; + let cache = s.cache.clone(); + let mut txns = Vec::new(); + + for i in 0..1000 { + let a = i * 4; + s.with_created(&[a]); + s.do_tx().await; + + let outputs = s.take_outputs(); + let tx = (*outputs.transaction).clone(); + let effects = outputs.effects.clone(); + + txns.push((tx, effects)); + } + + let barrier = Arc::new(std::sync::Barrier::new(2)); + + let t1 = { + let txns = txns.clone(); + let cache = cache.clone(); + let barrier = barrier.clone(); + std::thread::spawn(move || { + for (i, (tx, effects)) in txns.into_iter().enumerate() { + barrier.wait(); + // test both single and multi insert + if i % 2 == 0 { + cache.insert_transaction_and_effects(&tx, &effects); + } else { + cache.multi_insert_transaction_and_effects(&[VerifiedExecutionData::new( + tx, effects, + )]); + } + } + }) + }; + + let t2 = { + let txns = txns.clone(); + let cache = cache.clone(); + let barrier = barrier.clone(); + std::thread::spawn(move || { + for (tx, _) in txns { + barrier.wait(); + cache.get_transaction_block(tx.digest()); + } + }) + }; + + t1.join().unwrap(); + t2.join().unwrap(); +} diff --git a/crates/sui-core/src/execution_cache/writeback_cache.rs b/crates/sui-core/src/execution_cache/writeback_cache.rs index 0783ee082dff5..120fcc257c97d 100644 --- a/crates/sui-core/src/execution_cache/writeback_cache.rs +++ b/crates/sui-core/src/execution_cache/writeback_cache.rs @@ -2268,6 +2268,9 @@ impl StateSyncAPI for WritebackCache { transaction: &VerifiedTransaction, transaction_effects: &TransactionEffects, ) { + self.store + .insert_transaction_and_effects(transaction, transaction_effects) + .expect("db error"); self.cached .transactions .insert( @@ -2284,15 +2287,15 @@ impl StateSyncAPI for WritebackCache { Ticket::Write, ) .ok(); - self.store - .insert_transaction_and_effects(transaction, transaction_effects) - .expect("db error"); } fn multi_insert_transaction_and_effects( &self, transactions_and_effects: &[VerifiedExecutionData], ) { + self.store + .multi_insert_transaction_and_effects(transactions_and_effects.iter()) + .expect("db error"); for VerifiedExecutionData { transaction, effects, @@ -2315,8 +2318,5 @@ impl StateSyncAPI for WritebackCache { ) .ok(); } - self.store - .multi_insert_transaction_and_effects(transactions_and_effects.iter()) - .expect("db error"); } }