diff --git a/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs b/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs index e1ea613cf28..b686fde5b88 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs @@ -260,57 +260,57 @@ macro_rules! event_cache_store_integration_tests_time { let store = get_event_cache_store().await.unwrap().into_event_cache_store(); let acquired0 = store.try_take_leased_lock(0, "key", "alice").await.unwrap(); - assert!(acquired0); + assert_eq!(acquired0, Some(0)); // first lock generation // Should extend the lease automatically (same holder). let acquired2 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); - assert!(acquired2); + assert_eq!(acquired2, Some(0)); // same lock generation // Should extend the lease automatically (same holder + time is ok). let acquired3 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); - assert!(acquired3); + assert_eq!(acquired3, Some(0)); // same lock generation // Another attempt at taking the lock should fail, because it's taken. let acquired4 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(!acquired4); + assert!(acquired4.is_none()); // not acquired // Even if we insist. let acquired5 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(!acquired5); + assert!(acquired5.is_none()); // not acquired // That's a nice test we got here, go take a little nap. tokio::time::sleep(Duration::from_millis(50)).await; // Still too early. let acquired55 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(!acquired55); + assert!(acquired55.is_none()); // not acquired // Ok you can take another nap then. tokio::time::sleep(Duration::from_millis(250)).await; // At some point, we do get the lock. let acquired6 = store.try_take_leased_lock(0, "key", "bob").await.unwrap(); - assert!(acquired6); + assert_eq!(acquired6, Some(1)); // new lock generation! tokio::time::sleep(Duration::from_millis(1)).await; // The other gets it almost immediately too. let acquired7 = store.try_take_leased_lock(0, "key", "alice").await.unwrap(); - assert!(acquired7); + assert_eq!(acquired7, Some(2)); // new lock generation! tokio::time::sleep(Duration::from_millis(1)).await; - // But when we take a longer lease... + // But when we take a longer lease… let acquired8 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(acquired8); + assert_eq!(acquired8, Some(3)); // new lock generation! // It blocks the other user. let acquired9 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); - assert!(!acquired9); + assert!(acquired9.is_none()); // not acquired // We can hold onto our lease. let acquired10 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(acquired10); + assert_eq!(acquired10, Some(3)); // same lock generation } } }; diff --git a/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs b/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs index 1b5debbccee..2d4027fdae8 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs @@ -16,7 +16,8 @@ use std::{collections::HashMap, num::NonZeroUsize, sync::RwLock as StdRwLock, ti use async_trait::async_trait; use matrix_sdk_common::{ - ring_buffer::RingBuffer, store_locks::memory_store_helper::try_take_leased_lock, + ring_buffer::RingBuffer, + store_locks::{memory_store_helper::try_take_leased_lock, LockGeneration}, }; use ruma::{MxcUri, OwnedMxcUri}; @@ -30,7 +31,7 @@ use crate::media::{MediaRequestParameters, UniqueKey as _}; #[derive(Debug)] pub struct MemoryStore { media: StdRwLock)>>, - leases: StdRwLock>, + leases: StdRwLock>, } // SAFETY: `new_unchecked` is safe because 20 is not zero. @@ -62,7 +63,7 @@ impl EventCacheStore for MemoryStore { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result, Self::Error> { Ok(try_take_leased_lock(&self.leases, lease_duration_ms, key, holder)) } diff --git a/crates/matrix-sdk-base/src/event_cache/store/mod.rs b/crates/matrix-sdk-base/src/event_cache/store/mod.rs index 76b7032c195..cbb88c7e9a3 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/mod.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/mod.rs @@ -19,7 +19,14 @@ //! into the event cache for the actual storage. By default this brings an //! in-memory store. -use std::{fmt, ops::Deref, str::Utf8Error, sync::Arc}; +use std::{ + error::Error, + fmt, + ops::{Deref, DerefMut}, + result::Result as StdResult, + str::Utf8Error, + sync::{Arc, Mutex}, +}; #[cfg(any(test, feature = "testing"))] #[macro_use] @@ -28,7 +35,8 @@ mod memory_store; mod traits; use matrix_sdk_common::store_locks::{ - BackingStore, CrossProcessStoreLock, CrossProcessStoreLockGuard, LockStoreError, + BackingStore, CrossProcessStoreLock, CrossProcessStoreLockGuard, LockGeneration, + LockStoreError, FIRST_LOCK_GENERATION, }; pub use matrix_sdk_store_encryption::Error as StoreEncryptionError; @@ -43,11 +51,23 @@ pub use self::{ #[derive(Clone)] pub struct EventCacheStoreLock { /// The inner cross process lock that is used to lock the `EventCacheStore`. - cross_process_lock: CrossProcessStoreLock, + cross_process_lock: CrossProcessStoreLock>, + + /// A reference to the `LockableEventCacheStore`. + /// + /// This is used to get access to extra API on `LockableEventCacheStore`, + /// not restricted to the `BackingStore` trait. + /// + /// This is necessary because `CrossProcessStoreLock` doesn't provide a way + /// to get a reference to the inner backing store. And that's okay. + lockable_store: Arc, /// The store itself. /// - /// That's the only place where the store exists. + /// The store lives here, and inside `Self::lockable_store`. + /// + /// This is necessary because the lock methods return a guard that contains + /// a reference to the store. store: Arc, } @@ -68,22 +88,56 @@ impl EventCacheStoreLock { S: IntoEventCacheStore, { let store = store.into_event_cache_store(); + let lockable_store = Arc::new(LockableEventCacheStore::new(store.clone())); Self { cross_process_lock: CrossProcessStoreLock::new( - LockableEventCacheStore(store.clone()), + lockable_store.clone(), "default".to_owned(), holder, ), + lockable_store, store, } } /// Acquire a spin lock (see [`CrossProcessStoreLock::spin_lock`]). - pub async fn lock(&self) -> Result, LockStoreError> { + /// + /// It doesn't check whether the lock has been poisoned or not. + /// A lock has been poisoned if it's been acquired from another holder. + pub async fn lock_unchecked(&self) -> Result, LockStoreError> { + match self.lock().await? { + Ok(guard) => Ok(guard), + Err(poison_error) => Ok(poison_error.into_inner()), + } + } + + /// Acquire a spin lock (see [`CrossProcessStoreLock::spin_lock`]). + /// + /// It **does** check whether the lock has been poisoned or not. Use + /// [`EventCacheStoreLock::lock_unchecked`] if you don't want to check. A + /// lock has been poisoned if it's been acquired from another holder. + /// + /// This method returns a first `Result` to handle the locking error. The + /// `Ok` variant contains another `Result` to handle the locking poison. + pub async fn lock( + &self, + ) -> Result< + Result< + EventCacheStoreLockGuard<'_>, + EventCacheStoreLockPoisonError>, + >, + LockStoreError, + > { let cross_process_lock_guard = self.cross_process_lock.spin_lock(None).await?; + let event_cache_store_lock_guard = + EventCacheStoreLockGuard { cross_process_lock_guard, store: self.store.deref() }; - Ok(EventCacheStoreLockGuard { cross_process_lock_guard, store: self.store.deref() }) + Ok(if self.lockable_store.is_poisoned() { + Err(EventCacheStoreLockPoisonError(event_cache_store_lock_guard)) + } else { + Ok(event_cache_store_lock_guard) + }) } } @@ -114,12 +168,41 @@ impl<'a> Deref for EventCacheStoreLockGuard<'a> { } } +/// A type of error which can be returned whenever a cross-process lock is +/// acquired. +/// +/// [`EventCacheStoreLock`] is poisoned whenever the lock is acquired from +/// another holder than the current holder, i.e. if the previous lock was held +/// by another process basically. +pub struct EventCacheStoreLockPoisonError(T); + +impl EventCacheStoreLockPoisonError { + fn into_inner(self) -> T { + self.0 + } +} + +#[cfg(not(tarpaulin_include))] +impl fmt::Debug for EventCacheStoreLockPoisonError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("EventCacheStoreLockPoisonError").finish_non_exhaustive() + } +} + +impl fmt::Display for EventCacheStoreLockPoisonError { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + "Poisoned lock: lock has been acquired from another process".fmt(formatter) + } +} + +impl Error for EventCacheStoreLockPoisonError {} + /// Event cache store specific error type. #[derive(Debug, thiserror::Error)] pub enum EventCacheStoreError { /// An error happened in the underlying database backend. #[error(transparent)] - Backend(Box), + Backend(Box), /// The store is locked with a passphrase and an incorrect passphrase /// was given. @@ -153,19 +236,35 @@ impl EventCacheStoreError { #[inline] pub fn backend(error: E) -> Self where - E: std::error::Error + Send + Sync + 'static, + E: Error + Send + Sync + 'static, { Self::Backend(Box::new(error)) } } /// An `EventCacheStore` specific result type. -pub type Result = std::result::Result; +pub type Result = StdResult; /// A type that wraps the [`EventCacheStore`] but implements [`BackingStore`] to /// make it usable inside the cross process lock. #[derive(Clone, Debug)] -struct LockableEventCacheStore(Arc); +struct LockableEventCacheStore { + store: Arc, + generation_and_is_poisoned: Arc>, +} + +impl LockableEventCacheStore { + fn new(store: Arc) -> Self { + Self { + store, + generation_and_is_poisoned: Arc::new(Mutex::new((FIRST_LOCK_GENERATION, false))), + } + } + + fn is_poisoned(&self) -> bool { + self.generation_and_is_poisoned.lock().unwrap().1 + } +} #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] @@ -177,7 +276,170 @@ impl BackingStore for LockableEventCacheStore { lease_duration_ms: u32, key: &str, holder: &str, - ) -> std::result::Result { - self.0.try_take_leased_lock(lease_duration_ms, key, holder).await + ) -> StdResult { + let lock_generation = + self.store.try_take_leased_lock(lease_duration_ms, key, holder).await?; + + Ok(match lock_generation { + // Lock hasn't been acquired. + None => false, + + // Lock has been acquired, and we have a generation. + Some(generation) => { + let mut guard = self.generation_and_is_poisoned.lock().unwrap(); + let (last_generation, is_poisoned) = guard.deref_mut(); + + // The lock is considered poisoned if it's been acquired + // from another holder. If the lock is acquired from another + // holder, its generation is incremented by one. So, if + // `lock_generation` is different of `last_generation`, it + // means it's been acquired from another holder, and it is + // consequently poisoned; otherwise it is not poisoned. + // + // The initial value for `last_generation` **must be** + // `FIRST_LOCK_GENERATION`. + *is_poisoned = generation != *last_generation; + *last_generation = generation; + + true + } + }) + } +} + +#[cfg(all(test, not(target_arch = "wasm32")))] // because time is a thing +mod tests { + use std::{error::Error, fmt, sync::Arc, time::Duration}; + + use assert_matches::assert_matches; + use matrix_sdk_common::store_locks::MAX_BACKOFF_MS; + use matrix_sdk_test::async_test; + use ruma::user_id; + use tokio::time::sleep; + + use super::{EventCacheStoreError, EventCacheStoreLockPoisonError, MemoryStore}; + use crate::{store::StoreConfig, test_utils::logged_in_base_client_with_store_config}; + + #[async_test] + async fn test_not_poisoned_lock() { + let client = logged_in_base_client_with_store_config( + Some(user_id!("@client:sdk.rust")), + StoreConfig::new("holderA".to_owned()), + ) + .await; + + let event_cache_store_lock = client.event_cache_store(); + + // `lock_unchecked` is okay. + let guard = event_cache_store_lock.lock_unchecked().await; + assert!(guard.is_ok()); // lock has been acquired and may or may not be poisoned + + // `lock` is okay. + let guard = event_cache_store_lock.lock().await; + + assert!(guard.is_ok()); // lock has been acquired + assert!(guard.unwrap().is_ok()); // lock is not poisoned + } + + #[async_test] + async fn test_poisoned_lock() { + // Use the same memory store between clients A and B. + let memory_store = Arc::new(MemoryStore::new()); + + let client_a = logged_in_base_client_with_store_config( + Some(user_id!("@client_a:sdk.rust")), + StoreConfig::new("holderA".to_owned()).event_cache_store(memory_store.clone()), + ) + .await; + + let client_b = logged_in_base_client_with_store_config( + Some(user_id!("@client_b:sdk.rust")), + StoreConfig::new("holderB".to_owned()).event_cache_store(memory_store), + ) + .await; + + let event_cache_store_lock_a = client_a.event_cache_store(); + let event_cache_store_lock_b = client_b.event_cache_store(); + + // Client A can take the lock because no one has taken it so far. + { + // `lock` is okay. + let guard = event_cache_store_lock_a.lock().await; + + assert!(guard.is_ok()); // lock has been acquired + assert!(guard.unwrap().is_ok()); // lock is not poisoned + } + + sleep(Duration::from_millis(MAX_BACKOFF_MS as u64 + 100)).await; + + // Client B can take the lock since all locks from A are expired, but + // now, B is poisoned because the content of the event cache might have + // been modified (if someone takes a lock, it's probably for a good + // reason, right?). + { + // `lock` is okay. + let guard = event_cache_store_lock_b.lock().await; + + assert!(guard.is_ok()); // lock has been acquired + assert!(guard.unwrap().is_err()); // lock is poisoned! + } + + sleep(Duration::from_millis(MAX_BACKOFF_MS as u64 + 100)).await; + + // Client A can take the lock since all locks from B are expired, but + // now, A is poisoned. Let's not test `lock` but `lock_unchecked` this + // time. + { + // `lock_unchecked` is okay. + let guard = event_cache_store_lock_a.lock_unchecked().await; + + assert!(guard.is_ok()); // lock has been acquired and might be + // poisoned, we don't know with this method + } + + // Client A can still take the lock because it is holding it. The lock + // is no more poisoned. + { + // `lock` is okay. + let guard = event_cache_store_lock_a.lock().await; + + assert!(guard.is_ok()); // lock has been acquired + assert!(guard.unwrap().is_ok()); // lock is not poisoned + } + } + + #[test] + fn test_poison_error() { + let error = EventCacheStoreLockPoisonError(42); + + fn error_to_string(error: E) -> String + where + E: Error, + { + error.to_string() + } + + assert_eq!( + error_to_string(error), + "Poisoned lock: lock has been acquired from another process".to_owned(), + ); + } + + #[test] + fn test_backend_error() { + #[derive(Debug)] + struct Foo; + + impl fmt::Display for Foo { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.debug_struct("Foo").finish() + } + } + + impl Error for Foo {} + + let error = EventCacheStoreError::backend(Foo); + + assert_matches!(error, EventCacheStoreError::Backend(_)); } } diff --git a/crates/matrix-sdk-base/src/event_cache/store/traits.rs b/crates/matrix-sdk-base/src/event_cache/store/traits.rs index e52ad8b8b2e..e62bf8123bb 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/traits.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/traits.rs @@ -15,7 +15,7 @@ use std::{fmt, sync::Arc}; use async_trait::async_trait; -use matrix_sdk_common::AsyncTraitDeps; +use matrix_sdk_common::{store_locks::LockGeneration, AsyncTraitDeps}; use ruma::MxcUri; use super::EventCacheStoreError; @@ -35,7 +35,7 @@ pub trait EventCacheStore: AsyncTraitDeps { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result; + ) -> Result, Self::Error>; /// Add a media file's content in the media store. /// @@ -127,7 +127,7 @@ impl EventCacheStore for EraseEventCacheStoreError { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result, Self::Error> { self.0.try_take_leased_lock(lease_duration_ms, key, holder).await.map_err(Into::into) } diff --git a/crates/matrix-sdk-base/src/test_utils.rs b/crates/matrix-sdk-base/src/test_utils.rs index d64928d3c24..11c5e64d843 100644 --- a/crates/matrix-sdk-base/src/test_utils.rs +++ b/crates/matrix-sdk-base/src/test_utils.rs @@ -23,11 +23,24 @@ use crate::{store::StoreConfig, BaseClient, SessionMeta}; /// Create a [`BaseClient`] with the given user id, if provided, or an hardcoded /// one otherwise. pub(crate) async fn logged_in_base_client(user_id: Option<&UserId>) -> BaseClient { - let client = BaseClient::with_store_config(StoreConfig::new( - "cross-process-store-locks-holder-name".to_owned(), - )); + logged_in_base_client_with_store_config( + user_id, + StoreConfig::new("cross-process-store-locks-holder-name".to_owned()), + ) + .await +} + +/// Create a [`BaseClient`] with the given user id, if provided, or an hardcoded +/// one otherwise, and with a store config. +pub(crate) async fn logged_in_base_client_with_store_config( + user_id: Option<&UserId>, + store_config: StoreConfig, +) -> BaseClient { + let client = BaseClient::with_store_config(store_config); + let user_id = user_id.map(|user_id| user_id.to_owned()).unwrap_or_else(|| owned_user_id!("@u:e.uk")); + client .set_session_meta( SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() }, @@ -36,5 +49,6 @@ pub(crate) async fn logged_in_base_client(user_id: Option<&UserId>) -> BaseClien ) .await .expect("set_session_meta failed!"); + client } diff --git a/crates/matrix-sdk-common/src/store_locks.rs b/crates/matrix-sdk-common/src/store_locks.rs index 13350c1fa25..8ee86b06983 100644 --- a/crates/matrix-sdk-common/src/store_locks.rs +++ b/crates/matrix-sdk-common/src/store_locks.rs @@ -39,6 +39,7 @@ use std::{ error::Error, + ops::Deref, sync::{ atomic::{self, AtomicU32}, Arc, @@ -51,9 +52,16 @@ use tracing::{debug, error, info, instrument, trace}; use crate::{ executor::{spawn, JoinHandle}, - SendOutsideWasm, + SendOutsideWasm, SyncOutsideWasm, }; +/// A lock generation is an integer incremented each time it is taken by another +/// holder. This is not used by all cross-process locks. +pub type LockGeneration = u64; + +/// Describe the first lock generation value (see [`LockGeneration`]). +pub const FIRST_LOCK_GENERATION: LockGeneration = 0; + /// Backing store for a cross-process lock. #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] @@ -69,6 +77,24 @@ pub trait BackingStore { ) -> Result; } +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +impl BackingStore for Arc +where + T: BackingStore + SendOutsideWasm + SyncOutsideWasm, +{ + type LockError = T::LockError; + + async fn try_lock( + &self, + lease_duration_ms: u32, + key: &str, + holder: &str, + ) -> Result { + self.deref().try_lock(lease_duration_ms, key, holder).await + } +} + /// Small state machine to handle wait times. #[derive(Clone, Debug)] enum WaitingTime { @@ -351,16 +377,23 @@ mod tests { use super::{ memory_store_helper::try_take_leased_lock, BackingStore, CrossProcessStoreLock, - CrossProcessStoreLockGuard, LockStoreError, EXTEND_LEASE_EVERY_MS, + CrossProcessStoreLockGuard, LockGeneration, LockStoreError, EXTEND_LEASE_EVERY_MS, }; + type HolderExpirationGeneration = (String, Instant, LockGeneration); + #[derive(Clone, Default)] struct TestStore { - leases: Arc>>, + leases: Arc>>, } impl TestStore { - fn try_take_leased_lock(&self, lease_duration_ms: u32, key: &str, holder: &str) -> bool { + fn try_take_leased_lock( + &self, + lease_duration_ms: u32, + key: &str, + holder: &str, + ) -> Option { try_take_leased_lock(&self.leases, lease_duration_ms, key, holder) } } @@ -380,7 +413,7 @@ mod tests { key: &str, holder: &str, ) -> Result { - Ok(self.try_take_leased_lock(lease_duration_ms, key, holder)) + Ok(self.try_take_leased_lock(lease_duration_ms, key, holder).is_some()) } } @@ -506,36 +539,45 @@ pub mod memory_store_helper { time::{Duration, Instant}, }; + use super::{LockGeneration, FIRST_LOCK_GENERATION}; + + type HolderExpirationGeneration = (String, Instant, LockGeneration); + + /// Try to acquire or to extend the lock. + /// + /// Return `Some` if the lock has been acquired (or extended). It contains + /// the generation number. pub fn try_take_leased_lock( - leases: &RwLock>, + leases: &RwLock>, lease_duration_ms: u32, key: &str, holder: &str, - ) -> bool { + ) -> Option { let now = Instant::now(); let expiration = now + Duration::from_millis(lease_duration_ms.into()); match leases.write().unwrap().entry(key.to_owned()) { // There is an existing holder. Entry::Occupied(mut entry) => { - let (current_holder, current_expiration) = entry.get_mut(); + let (current_holder, current_expiration, current_generation) = entry.get_mut(); if current_holder == holder { // We had the lease before, extend it. *current_expiration = expiration; - true + Some(*current_generation) } else { // We didn't have it. if *current_expiration < now { // Steal it! *current_holder = holder.to_owned(); *current_expiration = expiration; + *current_generation += 1; - true + Some(*current_generation) } else { // We tried our best. - false + None } } } @@ -545,9 +587,10 @@ pub mod memory_store_helper { entry.insert(( holder.to_owned(), Instant::now() + Duration::from_millis(lease_duration_ms.into()), + 0, )); - true + Some(FIRST_LOCK_GENERATION) } } } diff --git a/crates/matrix-sdk-crypto/src/store/memorystore.rs b/crates/matrix-sdk-crypto/src/store/memorystore.rs index 6b177ea2641..1262864bcda 100644 --- a/crates/matrix-sdk-crypto/src/store/memorystore.rs +++ b/crates/matrix-sdk-crypto/src/store/memorystore.rs @@ -20,7 +20,7 @@ use std::{ }; use async_trait::async_trait; -use matrix_sdk_common::store_locks::memory_store_helper::try_take_leased_lock; +use matrix_sdk_common::store_locks::{memory_store_helper::try_take_leased_lock, LockGeneration}; use ruma::{ events::secret::request::SecretName, DeviceId, OwnedDeviceId, OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId, @@ -90,7 +90,7 @@ pub struct MemoryStore { key_requests_by_info: StdRwLock>, direct_withheld_info: StdRwLock>>, custom_values: StdRwLock>>, - leases: StdRwLock>, + leases: StdRwLock>, secret_inbox: StdRwLock>>, backup_keys: RwLock, next_batch_token: RwLock>, @@ -632,7 +632,7 @@ impl CryptoStore for MemoryStore { key: &str, holder: &str, ) -> Result { - Ok(try_take_leased_lock(&self.leases, lease_duration_ms, key, holder)) + Ok(try_take_leased_lock(&self.leases, lease_duration_ms, key, holder).is_some()) } } diff --git a/crates/matrix-sdk-sqlite/migrations/event_cache_store/003_lease_locks_with_generation.sql b/crates/matrix-sdk-sqlite/migrations/event_cache_store/003_lease_locks_with_generation.sql new file mode 100644 index 00000000000..01fb4bdda11 --- /dev/null +++ b/crates/matrix-sdk-sqlite/migrations/event_cache_store/003_lease_locks_with_generation.sql @@ -0,0 +1 @@ +ALTER TABLE "lease_locks" ADD COLUMN "generation" INTEGER NOT NULL DEFAULT 0; diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index c5a21fb0a6b..59c88d934e4 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -5,6 +5,7 @@ use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime}; use matrix_sdk_base::{ event_cache::store::EventCacheStore, media::{MediaRequestParameters, UniqueKey}, + store_locks::LockGeneration, }; use matrix_sdk_store_encryption::StoreCipher; use ruma::MilliSecondsSinceUnixEpoch; @@ -28,7 +29,7 @@ mod keys { /// This is used to figure whether the SQLite database requires a migration. /// Every new SQL migration should imply a bump of this number, and changes in /// the [`run_migrations`] function. -const DATABASE_VERSION: u8 = 2; +const DATABASE_VERSION: u8 = 3; /// A SQLite-based event cache store. #[derive(Clone)] @@ -142,6 +143,16 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> { .await?; } + if version < 3 { + conn.with_transaction(|txn| { + txn.execute_batch(include_str!( + "../migrations/event_cache_store/003_lease_locks_with_generation.sql" + ))?; + txn.set_db_version(3) + }) + .await?; + } + Ok(()) } @@ -154,32 +165,41 @@ impl EventCacheStore for SqliteEventCacheStore { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result> { let key = key.to_owned(); let holder = holder.to_owned(); let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into(); let expiration = now + lease_duration_ms as u64; - let num_touched = self + let generation = self .acquire() .await? .with_transaction(move |txn| { - txn.execute( + txn.query_row( "INSERT INTO lease_locks (key, holder, expiration) VALUES (?1, ?2, ?3) - ON CONFLICT (key) - DO - UPDATE SET holder = ?2, expiration = ?3 - WHERE holder = ?2 + ON CONFLICT (key) DO + UPDATE SET + holder = excluded.holder, + expiration = excluded.expiration, + generation = + CASE holder + WHEN excluded.holder THEN generation + ELSE generation + 1 + END + WHERE + holder = excluded.holder OR expiration < ?4 - ", + RETURNING generation", (key, holder, expiration, now), + |row| row.get(0), ) + .optional() }) .await?; - Ok(num_touched == 1) + Ok(generation) } async fn add_media_content( diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 8f0c2f19212..ce9d5579f74 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -225,6 +225,7 @@ impl EventCache { mut room_updates_feed: Receiver, ) { trace!("Spawning the listen task"); + loop { match room_updates_feed.recv().await { Ok(updates) => { diff --git a/crates/matrix-sdk/src/media.rs b/crates/matrix-sdk/src/media.rs index 9f686f1e2d8..b0712c36945 100644 --- a/crates/matrix-sdk/src/media.rs +++ b/crates/matrix-sdk/src/media.rs @@ -393,8 +393,13 @@ impl Media { ) -> Result> { // Read from the cache. if use_cache { - if let Some(content) = - self.client.event_cache_store().lock().await?.get_media_content(request).await? + if let Some(content) = self + .client + .event_cache_store() + .lock_unchecked() + .await? + .get_media_content(request) + .await? { return Ok(content); } @@ -497,7 +502,7 @@ impl Media { if use_cache { self.client .event_cache_store() - .lock() + .lock_unchecked() .await? .add_media_content(request, content.clone()) .await?; @@ -512,7 +517,13 @@ impl Media { /// /// * `request` - The `MediaRequest` of the content. pub async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> { - Ok(self.client.event_cache_store().lock().await?.remove_media_content(request).await?) + Ok(self + .client + .event_cache_store() + .lock_unchecked() + .await? + .remove_media_content(request) + .await?) } /// Delete all the media content corresponding to the given @@ -522,7 +533,13 @@ impl Media { /// /// * `uri` - The `MxcUri` of the files. pub async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> { - Ok(self.client.event_cache_store().lock().await?.remove_media_content_for_uri(uri).await?) + Ok(self + .client + .event_cache_store() + .lock_unchecked() + .await? + .remove_media_content_for_uri(uri) + .await?) } /// Get the file of the given media event content. diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index c8b95ca283e..1968da9bfd2 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -1988,7 +1988,7 @@ impl Room { .await?; if store_in_cache { - let cache_store_lock_guard = self.client.event_cache_store().lock().await?; + let cache_store_lock_guard = self.client.event_cache_store().lock_unchecked().await?; // A failure to cache shouldn't prevent the whole upload from finishing // properly, so only log errors during caching. diff --git a/crates/matrix-sdk/src/send_queue.rs b/crates/matrix-sdk/src/send_queue.rs index c248dc841e9..f8d21002b1d 100644 --- a/crates/matrix-sdk/src/send_queue.rs +++ b/crates/matrix-sdk/src/send_queue.rs @@ -706,7 +706,7 @@ impl RoomSendQueue { let data = room .client() .event_cache_store() - .lock() + .lock_unchecked() .await? .get_media_content(&cache_key) .await? diff --git a/crates/matrix-sdk/src/send_queue/upload.rs b/crates/matrix-sdk/src/send_queue/upload.rs index e363574458b..868974f38a0 100644 --- a/crates/matrix-sdk/src/send_queue/upload.rs +++ b/crates/matrix-sdk/src/send_queue/upload.rs @@ -167,7 +167,7 @@ impl RoomSendQueue { let client = room.client(); let cache_store = client .event_cache_store() - .lock() + .lock_unchecked() .await .map_err(RoomSendQueueStorageError::LockError)?; @@ -301,7 +301,7 @@ impl QueueStorage { let cache_store = client .event_cache_store() - .lock() + .lock_unchecked() .await .map_err(RoomSendQueueStorageError::LockError)?; @@ -530,7 +530,7 @@ impl QueueStorage { // At this point, all the requests and dependent requests have been cleaned up. // Perform the final step: empty the cache from the local items. { - let event_cache = client.event_cache_store().lock().await?; + let event_cache = client.event_cache_store().lock_unchecked().await?; event_cache .remove_media_content_for_uri(&make_local_uri(&handles.upload_file_txn)) .await?;