From 01641cb2ff443849d664cd634d6dcda0fcb9b1c4 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 9 Jul 2025 15:31:01 +0200 Subject: [PATCH 1/9] feat(sqlite): `SqliteStoreConfig::pool_size` sets a minimum to 2. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch updates `SqliteStoreConfig::pool_size` to be at least 2. We need 2 connections: one for write operations, one for read operations. This behaviour is coming in the next patches. --- crates/matrix-sdk-sqlite/src/lib.rs | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/crates/matrix-sdk-sqlite/src/lib.rs b/crates/matrix-sdk-sqlite/src/lib.rs index a8ab210d121..7130e15039e 100644 --- a/crates/matrix-sdk-sqlite/src/lib.rs +++ b/crates/matrix-sdk-sqlite/src/lib.rs @@ -25,6 +25,7 @@ mod event_cache_store; mod state_store; mod utils; use std::{ + cmp::max, fmt, path::{Path, PathBuf}, }; @@ -66,6 +67,12 @@ impl fmt::Debug for SqliteStoreConfig { } } +/// The minimum size of the connections pool. +/// +/// We need at least 2 connections: one connection for write operations, and one +/// connection for read operations. +const POOL_MINIMUM_SIZE: usize = 2; + impl SqliteStoreConfig { /// Create a new [`SqliteStoreConfig`] with a path representing the /// directory containing the store database. @@ -76,7 +83,7 @@ impl SqliteStoreConfig { Self { path: path.as_ref().to_path_buf(), passphrase: None, - pool_config: PoolConfig::new(num_cpus::get_physical() * 4), + pool_config: PoolConfig::new(max(POOL_MINIMUM_SIZE, num_cpus::get_physical() * 4)), runtime_config: RuntimeConfig::default(), } } @@ -122,7 +129,7 @@ impl SqliteStoreConfig { /// /// See [`deadpool_sqlite::PoolConfig::max_size`] to learn more. pub fn pool_max_size(mut self, max_size: usize) -> Self { - self.pool_config.max_size = max_size; + self.pool_config.max_size = max(POOL_MINIMUM_SIZE, max_size); self } @@ -218,7 +225,7 @@ mod tests { path::{Path, PathBuf}, }; - use super::SqliteStoreConfig; + use super::{SqliteStoreConfig, POOL_MINIMUM_SIZE}; #[test] fn test_new() { @@ -263,4 +270,11 @@ mod tests { assert_eq!(store_config.path, PathBuf::from("bar")); } + + #[test] + fn test_pool_size_has_a_minimum() { + let store_config = SqliteStoreConfig::new(Path::new("foo")).pool_max_size(1); + + assert_eq!(store_config.pool_config.max_size, POOL_MINIMUM_SIZE); + } } From e854a56a022ba920f05e4ac3c3f4533d68757cb1 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 9 Jul 2025 17:20:30 +0200 Subject: [PATCH 2/9] =?UTF-8?q?feat(sqlite):=20`SqliteEventCacheStore`=20h?= =?UTF-8?q?as=201=C2=A0write=20connection.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Until now, `SqliteEventCacheStore` manages a pool of connections. A connection is fetched from this pool and operations are executed on it, regardless whether these are read operations or write operations. We are seeing more and more _database is busy_ errors. We believe this is because too many write operations are executed concurrently. The solution to solve this is to use multiple connections for read operations, and a single connection for write operations. That way, concurrent writings are no longer a thing, and we hope it will reduce the number of _database is busy_ errors to zero. That's our guess. This patch does that. When the pool of connections is created, a connection is elected as the `write_connection`. To get a connection for read operations, one has to use the new `SqliteEventCacheStore::read` method (it replaces the `acquire` method). To get a connection for write operations, one has to use the new `SQliteEventCacheStore::write` method. It returns a `OwnedMutexGuard` from an async `Mutex`. All callers that want to do write operations on this store have to wait their turn, this `Mutex` is fair, and the first to wait on the lock is the first that will take the lock (FIFO). It guarantees the execution ordering the code expects. The rest of the patch updates all spots where `acquire` was used and replaces them by `read()` or `write()`. A particular care was made to see if other places are using `SqliteEventCacheStore::pool` directly. No place remains except in `read()` and `write()`. --- .../src/event_cache_store.rs | 144 +++++++++++------- 1 file changed, 89 insertions(+), 55 deletions(-) diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index 207f4e1010b..6f058c7275f 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -43,7 +43,10 @@ use ruma::{ OwnedEventId, RoomId, }; use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior}; -use tokio::fs; +use tokio::{ + fs, + sync::{Mutex, OwnedMutexGuard}, +}; use tracing::{debug, error, trace}; use crate::{ @@ -86,7 +89,16 @@ const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G"; #[derive(Clone)] pub struct SqliteEventCacheStore { store_cipher: Option>, + + /// The pool of connections. pool: SqlitePool, + + /// We make the difference between connections for read operations, and for + /// write operations. We keep a single connection apart from write + /// operations. All other connections are used for read operations. The + /// lock is used to ensure there is one owner at a time. + write_connection: Arc>, + media_service: MediaService, } @@ -125,7 +137,7 @@ impl SqliteEventCacheStore { let pool = config.create_pool(Runtime::Tokio1)?; let this = Self::open_with_pool(pool, passphrase.as_deref()).await?; - this.pool.get().await?.apply_runtime_config(runtime_config).await?; + this.write().await?.apply_runtime_config(runtime_config).await?; Ok(this) } @@ -151,10 +163,17 @@ impl SqliteEventCacheStore { let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?; media_service.restore(media_retention_policy, last_media_cleanup_time); - Ok(Self { store_cipher, pool, media_service }) + Ok(Self { + store_cipher, + pool, + // Use `conn` as our selected write connections. + write_connection: Arc::new(Mutex::new(conn)), + media_service, + }) } - async fn acquire(&self) -> Result { + // Acquire a connection for executing read operations. + async fn read(&self) -> Result { let connection = self.pool.get().await?; // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key @@ -166,6 +185,19 @@ impl SqliteEventCacheStore { Ok(connection) } + // Acquire a connection for executing write operations. + async fn write(&self) -> Result> { + let connection = self.write_connection.clone().lock_owned().await; + + // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key + // support must be enabled on a per-connection basis. Execute it every + // time we try to get a connection, since we can't guarantee a previous + // connection did enable it before. + connection.execute_batch("PRAGMA foreign_keys = ON;").await?; + + Ok(connection) + } + fn map_row_to_chunk( row: &rusqlite::Row<'_>, ) -> Result<(u64, Option, Option, String), rusqlite::Error> { @@ -425,7 +457,7 @@ impl EventCacheStore for SqliteEventCacheStore { let expiration = now + lease_duration_ms as u64; let num_touched = self - .acquire() + .write() .await? .with_transaction(move |txn| { txn.execute( @@ -457,7 +489,7 @@ impl EventCacheStore for SqliteEventCacheStore { let linked_chunk_id = linked_chunk_id.to_owned(); let this = self.clone(); - with_immediate_transaction(self.acquire().await?, move |txn| { + with_immediate_transaction(self, move |txn| { for up in updates { match up { Update::NewItemsChunk { previous, new, next } => { @@ -783,7 +815,7 @@ impl EventCacheStore for SqliteEventCacheStore { let this = self.clone(); let result = self - .acquire() + .read() .await? .with_transaction(move |txn| -> Result<_> { let mut items = Vec::new(); @@ -821,7 +853,7 @@ impl EventCacheStore for SqliteEventCacheStore { let hashed_linked_chunk_id = self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()); - self.acquire() + self.read() .await? .with_transaction(move |txn| -> Result<_> { // I'm not a DB analyst, so for my own future sanity: this query joins the @@ -884,7 +916,7 @@ impl EventCacheStore for SqliteEventCacheStore { let this = self.clone(); self - .acquire() + .read() .await? .with_transaction(move |txn| -> Result<_> { // Find the latest chunk identifier to generate a `ChunkIdentifierGenerator`, and count the number of chunks. @@ -977,7 +1009,7 @@ impl EventCacheStore for SqliteEventCacheStore { let this = self.clone(); self - .acquire() + .read() .await? .with_transaction(move |txn| -> Result<_> { // Find the chunk before the chunk identified by `before_chunk_identifier`. @@ -1018,7 +1050,7 @@ impl EventCacheStore for SqliteEventCacheStore { } async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> { - self.acquire() + self.write() .await? .with_transaction(move |txn| { // Remove all the chunks, and let cascading do its job. @@ -1047,7 +1079,7 @@ impl EventCacheStore for SqliteEventCacheStore { self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()); let linked_chunk_id = linked_chunk_id.to_owned(); - self.acquire() + self.read() .await? .with_transaction(move |txn| -> Result<_> { txn.chunk_large_query_over(events, None, move |txn, events| { @@ -1119,7 +1151,7 @@ impl EventCacheStore for SqliteEventCacheStore { let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id); - self.acquire() + self.read() .await? .with_transaction(move |txn| -> Result<_> { let Some(event) = txn @@ -1153,7 +1185,7 @@ impl EventCacheStore for SqliteEventCacheStore { let filters = filters.map(ToOwned::to_owned); let this = self.clone(); - self.acquire() + self.read() .await? .with_transaction(move |txn| -> Result<_> { let filter_query = if let Some(filters) = compute_filters_string(filters.as_deref()) @@ -1216,7 +1248,7 @@ impl EventCacheStore for SqliteEventCacheStore { let event_id = event_id.to_string(); let encoded_event = self.encode_event(&event)?; - self.acquire() + self.write() .await? .with_transaction(move |txn| -> Result<_> { txn.execute( @@ -1248,7 +1280,7 @@ impl EventCacheStore for SqliteEventCacheStore { let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key()); let new_format = self.encode_key(keys::MEDIA, to.format.unique_key()); - let conn = self.acquire().await?; + let conn = self.write().await?; conn.execute( r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#, (new_uri, new_format, prev_uri, prev_format), @@ -1266,7 +1298,7 @@ impl EventCacheStore for SqliteEventCacheStore { let uri = self.encode_key(keys::MEDIA, request.source.unique_key()); let format = self.encode_key(keys::MEDIA, request.format.unique_key()); - let conn = self.acquire().await?; + let conn = self.write().await?; conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?; Ok(()) @@ -1282,7 +1314,7 @@ impl EventCacheStore for SqliteEventCacheStore { async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> { let uri = self.encode_key(keys::MEDIA, uri); - let conn = self.acquire().await?; + let conn = self.write().await?; conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?; Ok(()) @@ -1320,7 +1352,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore { async fn media_retention_policy_inner( &self, ) -> Result, Self::Error> { - let conn = self.acquire().await?; + let conn = self.read().await?; conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await } @@ -1328,7 +1360,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore { &self, policy: MediaRetentionPolicy, ) -> Result<(), Self::Error> { - let conn = self.acquire().await?; + let conn = self.write().await?; conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?; Ok(()) } @@ -1352,7 +1384,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore { let format = self.encode_key(keys::MEDIA, request.format.unique_key()); let timestamp = time_to_timestamp(last_access); - let conn = self.acquire().await?; + let conn = self.write().await?; conn.execute( "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)", (uri, format, data, timestamp, ignore_policy), @@ -1371,7 +1403,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore { let format = self.encode_key(keys::MEDIA, request.format.unique_key()); let ignore_policy = ignore_policy.is_yes(); - let conn = self.acquire().await?; + let conn = self.write().await?; conn.execute( r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#, (ignore_policy, uri, format), @@ -1390,7 +1422,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore { let format = self.encode_key(keys::MEDIA, request.format.unique_key()); let timestamp = time_to_timestamp(current_time); - let conn = self.acquire().await?; + let conn = self.write().await?; let data = conn .with_transaction::<_, rusqlite::Error, _>(move |txn| { // Update the last access. @@ -1421,7 +1453,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore { let uri = self.encode_key(keys::MEDIA, uri); let timestamp = time_to_timestamp(current_time); - let conn = self.acquire().await?; + let conn = self.write().await?; let data = conn .with_transaction::<_, rusqlite::Error, _>(move |txn| { // Update the last access. @@ -1451,7 +1483,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore { return Ok(()); } - let conn = self.acquire().await?; + let conn = self.write().await?; let removed = conn .with_transaction::<_, Error, _>(move |txn| { let mut removed = false; @@ -1570,7 +1602,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore { } async fn last_media_cleanup_time_inner(&self) -> Result, Self::Error> { - let conn = self.acquire().await?; + let conn = self.read().await?; conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await } } @@ -1583,33 +1615,35 @@ async fn with_immediate_transaction< T: Send + 'static, F: FnOnce(&Transaction<'_>) -> Result + Send + 'static, >( - conn: SqliteAsyncConn, + this: &SqliteEventCacheStore, f: F, ) -> Result { - conn.interact(move |conn| -> Result { - // Start the transaction in IMMEDIATE mode since all updates may cause writes, - // to avoid read transactions upgrading to write mode and causing - // SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions - conn.set_transaction_behavior(TransactionBehavior::Immediate); - - let code = || -> Result { - let txn = conn.transaction()?; - let res = f(&txn)?; - txn.commit()?; - Ok(res) - }; - - let res = code(); - - // Reset the transaction behavior to use Deferred, after this transaction has - // been run, whether it was successful or not. - conn.set_transaction_behavior(TransactionBehavior::Deferred); - - res - }) - .await - // SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].` - .unwrap() + this.write() + .await? + .interact(move |conn| -> Result { + // Start the transaction in IMMEDIATE mode since all updates may cause writes, + // to avoid read transactions upgrading to write mode and causing + // SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions + conn.set_transaction_behavior(TransactionBehavior::Immediate); + + let code = || -> Result { + let txn = conn.transaction()?; + let res = f(&txn)?; + txn.commit()?; + Ok(res) + }; + + let res = code(); + + // Reset the transaction behavior to use Deferred, after this transaction has + // been run, whether it was successful or not. + conn.set_transaction_behavior(TransactionBehavior::Deferred); + + res + }) + .await + // SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].` + .unwrap() } fn insert_chunk( @@ -1716,7 +1750,7 @@ mod tests { async fn get_event_cache_store_content_sorted_by_last_access( event_cache_store: &SqliteEventCacheStore, ) -> Vec> { - let sqlite_db = event_cache_store.acquire().await.expect("accessing sqlite db failed"); + let sqlite_db = event_cache_store.read().await.expect("accessing sqlite db failed"); sqlite_db .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| { stmt.query(())?.mapped(|row| row.get(0)).collect() @@ -2006,7 +2040,7 @@ mod tests { // Check that cascading worked. Yes, SQLite, I doubt you. let gaps = store - .acquire() + .read() .await .unwrap() .with_transaction(|txn| -> rusqlite::Result<_> { @@ -2128,7 +2162,7 @@ mod tests { // Make sure the position have been updated for the remaining events. let num_rows: u64 = store - .acquire() + .read() .await .unwrap() .with_transaction(move |txn| { @@ -2277,7 +2311,7 @@ mod tests { // Check that cascading worked. Yes, SQLite, I doubt you. store - .acquire() + .read() .await .unwrap() .with_transaction(|txn| -> rusqlite::Result<_> { From fc21286cad532608baa4cf453470cb2699fca291 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Sun, 13 Jul 2025 08:01:17 +0200 Subject: [PATCH 3/9] feat(sqlite): Add logs around `read` and `write`. --- crates/matrix-sdk-sqlite/src/event_cache_store.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index 6f058c7275f..d040425ed03 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -47,7 +47,7 @@ use tokio::{ fs, sync::{Mutex, OwnedMutexGuard}, }; -use tracing::{debug, error, trace}; +use tracing::{debug, error, instrument, trace}; use crate::{ error::{Error, Result}, @@ -173,9 +173,14 @@ impl SqliteEventCacheStore { } // Acquire a connection for executing read operations. + #[instrument(skip_all)] async fn read(&self) -> Result { + trace!("Taking a `read` connection"); + let connection = self.pool.get().await?; + trace!("`read` connection taken"); + // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key // support must be enabled on a per-connection basis. Execute it every // time we try to get a connection, since we can't guarantee a previous @@ -186,9 +191,14 @@ impl SqliteEventCacheStore { } // Acquire a connection for executing write operations. + #[instrument(skip_all)] async fn write(&self) -> Result> { + trace!("Taking a `write` connection"); + let connection = self.write_connection.clone().lock_owned().await; + trace!("`write` connection taken"); + // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key // support must be enabled on a per-connection basis. Execute it every // time we try to get a connection, since we can't guarantee a previous From 7a87580b9d7f3302da8a0aebcefeed3d0ea589cc Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Sun, 13 Jul 2025 08:08:51 +0200 Subject: [PATCH 4/9] feat(common): `TracingTimer` uses the `Debug` impl of `Duration`. This changes the `TracingTimer` message to use the `Debug` impl of `Duration` instead of displaying it as milliseconds. It can help spotting seconds without counting all the digits. --- crates/matrix-sdk-common/src/tracing_timer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/matrix-sdk-common/src/tracing_timer.rs b/crates/matrix-sdk-common/src/tracing_timer.rs index ccf478644c3..e665ed5e28a 100644 --- a/crates/matrix-sdk-common/src/tracing_timer.rs +++ b/crates/matrix-sdk-common/src/tracing_timer.rs @@ -33,7 +33,7 @@ impl std::fmt::Debug for TracingTimer { impl Drop for TracingTimer { fn drop(&mut self) { - let message = format!("{} finished in {}ms", self.id, self.start.elapsed().as_millis()); + let message = format!("`{}` finished in {:?}", self.id, self.start.elapsed()); let enabled = tracing::level_enabled!(self.level) && { let interest = self.callsite.interest(); From a5d88d9d048ed5be7d84d5f4f2d49536779d22a2 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Sun, 13 Jul 2025 08:10:34 +0200 Subject: [PATCH 5/9] refator(common): Rename `TracingTimer::new_debug`. This patch renames `TracingTiming::new_debug` to `new`. The documentation claims it sets the log level to `debug` while the `level` is actually an argument of the constructor. It's then wrong, and the constructor must be renamed. --- crates/matrix-sdk-common/src/tracing_timer.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/crates/matrix-sdk-common/src/tracing_timer.rs b/crates/matrix-sdk-common/src/tracing_timer.rs index e665ed5e28a..5206098a50a 100644 --- a/crates/matrix-sdk-common/src/tracing_timer.rs +++ b/crates/matrix-sdk-common/src/tracing_timer.rs @@ -33,7 +33,7 @@ impl std::fmt::Debug for TracingTimer { impl Drop for TracingTimer { fn drop(&mut self) { - let message = format!("`{}` finished in {:?}", self.id, self.start.elapsed()); + let message = format!("_{}_ finished in {:?}", self.id, self.start.elapsed()); let enabled = tracing::level_enabled!(self.level) && { let interest = self.callsite.interest(); @@ -62,12 +62,8 @@ impl Drop for TracingTimer { } impl TracingTimer { - /// Create a new `TracingTimer` at the `debug` log level. - pub fn new_debug( - callsite: &'static DefaultCallsite, - id: String, - level: tracing::Level, - ) -> Self { + /// Create a new `TracingTimer`. + pub fn new(callsite: &'static DefaultCallsite, id: String, level: tracing::Level) -> Self { Self { id, callsite, start: Instant::now(), level } } } @@ -111,7 +107,7 @@ macro_rules! timer { fields: [] }; - $crate::tracing_timer::TracingTimer::new_debug(&__CALLSITE, $string.into(), $level) + $crate::tracing_timer::TracingTimer::new(&__CALLSITE, $string.into(), $level) }}; ($string:expr) => { @@ -133,7 +129,7 @@ mod tests { let _timer_guard = timer!(tracing::Level::DEBUG, "test"); tokio::time::sleep(ruma::time::Duration::from_millis(123)).await; // Displays: 2023-08-25T15:18:31.169498Z DEBUG - // matrix_sdk_common::tracing_timer::tests: test finished in + // matrix_sdk_common::tracing_timer::tests: _test_ finished in // 124ms } } From 6f932a904ebf9bb652535b899630da22e2f327bc Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Sun, 13 Jul 2025 08:36:14 +0200 Subject: [PATCH 6/9] feat(sqlite): Add `timer!` tracings in `read` and `write`'s `SqliteEventCacheStore`. --- crates/matrix-sdk-sqlite/src/event_cache_store.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index d040425ed03..363253c8cf0 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -36,6 +36,7 @@ use matrix_sdk_base::{ Position, RawChunk, Update, }, media::{MediaRequestParameters, UniqueKey}, + timer, }; use matrix_sdk_store_encryption::StoreCipher; use ruma::{ @@ -176,11 +177,10 @@ impl SqliteEventCacheStore { #[instrument(skip_all)] async fn read(&self) -> Result { trace!("Taking a `read` connection"); + let _timer = timer!("connection"); let connection = self.pool.get().await?; - trace!("`read` connection taken"); - // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key // support must be enabled on a per-connection basis. Execute it every // time we try to get a connection, since we can't guarantee a previous @@ -194,11 +194,10 @@ impl SqliteEventCacheStore { #[instrument(skip_all)] async fn write(&self) -> Result> { trace!("Taking a `write` connection"); + let _timer = timer!("connection"); let connection = self.write_connection.clone().lock_owned().await; - trace!("`write` connection taken"); - // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key // support must be enabled on a per-connection basis. Execute it every // time we try to get a connection, since we can't guarantee a previous From 0f08631d3896b58cae57cec969ac17f16540fcf2 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Sun, 13 Jul 2025 08:37:25 +0200 Subject: [PATCH 7/9] feat(sqlite): Add `#[instrument]` around all `SqliteEventCacheStore` methods. --- .../src/event_cache_store.rs | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index 363253c8cf0..64863e23428 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -453,6 +453,7 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> { impl EventCacheStore for SqliteEventCacheStore { type Error = Error; + #[instrument(skip(self))] async fn try_take_leased_lock( &self, lease_duration_ms: u32, @@ -486,6 +487,7 @@ impl EventCacheStore for SqliteEventCacheStore { Ok(num_touched == 1) } + #[instrument(skip(self, updates))] async fn handle_linked_chunk_updates( &self, linked_chunk_id: LinkedChunkId<'_>, @@ -814,6 +816,7 @@ impl EventCacheStore for SqliteEventCacheStore { Ok(()) } + #[instrument(skip(self))] async fn load_all_chunks( &self, linked_chunk_id: LinkedChunkId<'_>, @@ -855,6 +858,7 @@ impl EventCacheStore for SqliteEventCacheStore { Ok(result) } + #[instrument(skip(self))] async fn load_all_chunks_metadata( &self, linked_chunk_id: LinkedChunkId<'_>, @@ -915,6 +919,7 @@ impl EventCacheStore for SqliteEventCacheStore { .await } + #[instrument(skip(self))] async fn load_last_chunk( &self, linked_chunk_id: LinkedChunkId<'_>, @@ -1007,6 +1012,7 @@ impl EventCacheStore for SqliteEventCacheStore { .await } + #[instrument(skip(self))] async fn load_previous_chunk( &self, linked_chunk_id: LinkedChunkId<'_>, @@ -1058,6 +1064,7 @@ impl EventCacheStore for SqliteEventCacheStore { .await } + #[instrument(skip(self))] async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> { self.write() .await? @@ -1071,6 +1078,7 @@ impl EventCacheStore for SqliteEventCacheStore { Ok(()) } + #[instrument(skip(self, events))] async fn filter_duplicated_events( &self, linked_chunk_id: LinkedChunkId<'_>, @@ -1150,6 +1158,7 @@ impl EventCacheStore for SqliteEventCacheStore { .await } + #[instrument(skip(self, event_id))] async fn find_event( &self, room_id: &RoomId, @@ -1179,6 +1188,7 @@ impl EventCacheStore for SqliteEventCacheStore { .await } + #[instrument(skip(self, event_id, filters))] async fn find_event_relations( &self, room_id: &RoomId, @@ -1247,6 +1257,7 @@ impl EventCacheStore for SqliteEventCacheStore { .await } + #[instrument(skip(self, event))] async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> { let Some(event_id) = event.event_id() else { error!(%room_id, "Trying to save an event with no ID"); @@ -1269,6 +1280,7 @@ impl EventCacheStore for SqliteEventCacheStore { .await } + #[instrument(skip_all)] async fn add_media_content( &self, request: &MediaRequestParameters, @@ -1278,6 +1290,7 @@ impl EventCacheStore for SqliteEventCacheStore { self.media_service.add_media_content(self, request, content, ignore_policy).await } + #[instrument(skip_all)] async fn replace_media_key( &self, from: &MediaRequestParameters, @@ -1299,10 +1312,12 @@ impl EventCacheStore for SqliteEventCacheStore { Ok(()) } + #[instrument(skip_all)] async fn get_media_content(&self, request: &MediaRequestParameters) -> Result>> { self.media_service.get_media_content(self, request).await } + #[instrument(skip_all)] async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> { let uri = self.encode_key(keys::MEDIA, request.source.unique_key()); let format = self.encode_key(keys::MEDIA, request.format.unique_key()); @@ -1313,6 +1328,7 @@ impl EventCacheStore for SqliteEventCacheStore { Ok(()) } + #[instrument(skip(self))] async fn get_media_content_for_uri( &self, uri: &MxcUri, @@ -1320,6 +1336,7 @@ impl EventCacheStore for SqliteEventCacheStore { self.media_service.get_media_content_for_uri(self, uri).await } + #[instrument(skip(self))] async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> { let uri = self.encode_key(keys::MEDIA, uri); @@ -1329,6 +1346,7 @@ impl EventCacheStore for SqliteEventCacheStore { Ok(()) } + #[instrument(skip_all)] async fn set_media_retention_policy( &self, policy: MediaRetentionPolicy, @@ -1336,10 +1354,12 @@ impl EventCacheStore for SqliteEventCacheStore { self.media_service.set_media_retention_policy(self, policy).await } + #[instrument(skip_all)] fn media_retention_policy(&self) -> MediaRetentionPolicy { self.media_service.media_retention_policy() } + #[instrument(skip_all)] async fn set_ignore_media_retention_policy( &self, request: &MediaRequestParameters, @@ -1348,6 +1368,7 @@ impl EventCacheStore for SqliteEventCacheStore { self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await } + #[instrument(skip_all)] async fn clean_up_media_cache(&self) -> Result<(), Self::Error> { self.media_service.clean_up_media_cache(self).await } From e296f9d938f213ec02803660e5e38cceda9a6351 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Sun, 13 Jul 2025 08:53:35 +0200 Subject: [PATCH 8/9] feat(sqlite): Instrument `SqliteEventCacheStore::open_with_config`. --- crates/matrix-sdk-sqlite/src/event_cache_store.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index 64863e23428..0da89159432 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -127,7 +127,12 @@ impl SqliteEventCacheStore { } /// Open the SQLite-based event cache store with the config open config. + #[instrument(skip(config), fields(path = ?config.path))] pub async fn open_with_config(config: SqliteStoreConfig) -> Result { + debug!(?config); + + let _timer = timer!("open_with_config"); + let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config; fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?; From fd73cb36c01f039f00eea3e6014166153281f1ce Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Sun, 13 Jul 2025 09:09:44 +0200 Subject: [PATCH 9/9] feat(sqlite): Add more `timer!` logs in each `EventCacheStore` methods. This patch adds `timer!` logs in each method from `EventCacheStore` for `SqliteEventCacheStore`. It will help to know the execution duration of each of these methods. --- .../src/event_cache_store.rs | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index 0da89159432..b5a18ef7e0d 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -465,6 +465,8 @@ impl EventCacheStore for SqliteEventCacheStore { key: &str, holder: &str, ) -> Result { + let _timer = timer!("method"); + let key = key.to_owned(); let holder = holder.to_owned(); @@ -498,6 +500,8 @@ impl EventCacheStore for SqliteEventCacheStore { linked_chunk_id: LinkedChunkId<'_>, updates: Vec>, ) -> Result<(), Self::Error> { + let _timer = timer!("method"); + // Use a single transaction throughout this function, so that either all updates // work, or none is taken into account. let hashed_linked_chunk_id = @@ -826,6 +830,8 @@ impl EventCacheStore for SqliteEventCacheStore { &self, linked_chunk_id: LinkedChunkId<'_>, ) -> Result>, Self::Error> { + let _timer = timer!("method"); + let hashed_linked_chunk_id = self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()); @@ -868,6 +874,8 @@ impl EventCacheStore for SqliteEventCacheStore { &self, linked_chunk_id: LinkedChunkId<'_>, ) -> Result, Self::Error> { + let _timer = timer!("method"); + let hashed_linked_chunk_id = self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()); @@ -929,6 +937,8 @@ impl EventCacheStore for SqliteEventCacheStore { &self, linked_chunk_id: LinkedChunkId<'_>, ) -> Result<(Option>, ChunkIdentifierGenerator), Self::Error> { + let _timer = timer!("method"); + let hashed_linked_chunk_id = self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()); @@ -1023,6 +1033,8 @@ impl EventCacheStore for SqliteEventCacheStore { linked_chunk_id: LinkedChunkId<'_>, before_chunk_identifier: ChunkIdentifier, ) -> Result>, Self::Error> { + let _timer = timer!("method"); + let hashed_linked_chunk_id = self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()); @@ -1071,6 +1083,8 @@ impl EventCacheStore for SqliteEventCacheStore { #[instrument(skip(self))] async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> { + let _timer = timer!("method"); + self.write() .await? .with_transaction(move |txn| { @@ -1080,6 +1094,7 @@ impl EventCacheStore for SqliteEventCacheStore { txn.execute("DELETE FROM events", ()) }) .await?; + Ok(()) } @@ -1089,6 +1104,8 @@ impl EventCacheStore for SqliteEventCacheStore { linked_chunk_id: LinkedChunkId<'_>, events: Vec, ) -> Result, Self::Error> { + let _timer = timer!("method"); + // If there's no events for which we want to check duplicates, we can return // early. It's not only an optimization to do so: it's required, otherwise the // `repeat_vars` call below will panic. @@ -1169,6 +1186,8 @@ impl EventCacheStore for SqliteEventCacheStore { room_id: &RoomId, event_id: &EventId, ) -> Result, Self::Error> { + let _timer = timer!("method"); + let event_id = event_id.to_owned(); let this = self.clone(); @@ -1200,6 +1219,8 @@ impl EventCacheStore for SqliteEventCacheStore { event_id: &EventId, filters: Option<&[RelationType]>, ) -> Result)>, Self::Error> { + let _timer = timer!("method"); + let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id); let hashed_linked_chunk_id = @@ -1264,6 +1285,8 @@ impl EventCacheStore for SqliteEventCacheStore { #[instrument(skip(self, event))] async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> { + let _timer = timer!("method"); + let Some(event_id) = event.event_id() else { error!(%room_id, "Trying to save an event with no ID"); return Ok(()); @@ -1292,6 +1315,8 @@ impl EventCacheStore for SqliteEventCacheStore { content: Vec, ignore_policy: IgnoreMediaRetentionPolicy, ) -> Result<()> { + let _timer = timer!("method"); + self.media_service.add_media_content(self, request, content, ignore_policy).await } @@ -1301,6 +1326,8 @@ impl EventCacheStore for SqliteEventCacheStore { from: &MediaRequestParameters, to: &MediaRequestParameters, ) -> Result<(), Self::Error> { + let _timer = timer!("method"); + let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key()); let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key()); @@ -1319,11 +1346,15 @@ impl EventCacheStore for SqliteEventCacheStore { #[instrument(skip_all)] async fn get_media_content(&self, request: &MediaRequestParameters) -> Result>> { + let _timer = timer!("method"); + self.media_service.get_media_content(self, request).await } #[instrument(skip_all)] async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> { + let _timer = timer!("method"); + let uri = self.encode_key(keys::MEDIA, request.source.unique_key()); let format = self.encode_key(keys::MEDIA, request.format.unique_key()); @@ -1338,11 +1369,15 @@ impl EventCacheStore for SqliteEventCacheStore { &self, uri: &MxcUri, ) -> Result>, Self::Error> { + let _timer = timer!("method"); + self.media_service.get_media_content_for_uri(self, uri).await } #[instrument(skip(self))] async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> { + let _timer = timer!("method"); + let uri = self.encode_key(keys::MEDIA, uri); let conn = self.write().await?; @@ -1356,11 +1391,15 @@ impl EventCacheStore for SqliteEventCacheStore { &self, policy: MediaRetentionPolicy, ) -> Result<(), Self::Error> { + let _timer = timer!("method"); + self.media_service.set_media_retention_policy(self, policy).await } #[instrument(skip_all)] fn media_retention_policy(&self) -> MediaRetentionPolicy { + let _timer = timer!("method"); + self.media_service.media_retention_policy() } @@ -1370,11 +1409,15 @@ impl EventCacheStore for SqliteEventCacheStore { request: &MediaRequestParameters, ignore_policy: IgnoreMediaRetentionPolicy, ) -> Result<(), Self::Error> { + let _timer = timer!("method"); + self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await } #[instrument(skip_all)] async fn clean_up_media_cache(&self) -> Result<(), Self::Error> { + let _timer = timer!("method"); + self.media_service.clean_up_media_cache(self).await } }