diff --git a/crates/matrix-sdk-common/src/tracing_timer.rs b/crates/matrix-sdk-common/src/tracing_timer.rs index ccf478644c3..5206098a50a 100644 --- a/crates/matrix-sdk-common/src/tracing_timer.rs +++ b/crates/matrix-sdk-common/src/tracing_timer.rs @@ -33,7 +33,7 @@ impl std::fmt::Debug for TracingTimer { impl Drop for TracingTimer { fn drop(&mut self) { - let message = format!("{} finished in {}ms", self.id, self.start.elapsed().as_millis()); + let message = format!("_{}_ finished in {:?}", self.id, self.start.elapsed()); let enabled = tracing::level_enabled!(self.level) && { let interest = self.callsite.interest(); @@ -62,12 +62,8 @@ impl Drop for TracingTimer { } impl TracingTimer { - /// Create a new `TracingTimer` at the `debug` log level. - pub fn new_debug( - callsite: &'static DefaultCallsite, - id: String, - level: tracing::Level, - ) -> Self { + /// Create a new `TracingTimer`. + pub fn new(callsite: &'static DefaultCallsite, id: String, level: tracing::Level) -> Self { Self { id, callsite, start: Instant::now(), level } } } @@ -111,7 +107,7 @@ macro_rules! timer { fields: [] }; - $crate::tracing_timer::TracingTimer::new_debug(&__CALLSITE, $string.into(), $level) + $crate::tracing_timer::TracingTimer::new(&__CALLSITE, $string.into(), $level) }}; ($string:expr) => { @@ -133,7 +129,7 @@ mod tests { let _timer_guard = timer!(tracing::Level::DEBUG, "test"); tokio::time::sleep(ruma::time::Duration::from_millis(123)).await; // Displays: 2023-08-25T15:18:31.169498Z DEBUG - // matrix_sdk_common::tracing_timer::tests: test finished in + // matrix_sdk_common::tracing_timer::tests: _test_ finished in // 124ms } } diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index 207f4e1010b..b5a18ef7e0d 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -36,6 +36,7 @@ use matrix_sdk_base::{ Position, RawChunk, Update, }, media::{MediaRequestParameters, UniqueKey}, + timer, }; use matrix_sdk_store_encryption::StoreCipher; use ruma::{ @@ -43,8 +44,11 @@ use ruma::{ OwnedEventId, RoomId, }; use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior}; -use tokio::fs; -use tracing::{debug, error, trace}; +use tokio::{ + fs, + sync::{Mutex, OwnedMutexGuard}, +}; +use tracing::{debug, error, instrument, trace}; use crate::{ error::{Error, Result}, @@ -86,7 +90,16 @@ const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G"; #[derive(Clone)] pub struct SqliteEventCacheStore { store_cipher: Option>, + + /// The pool of connections. pool: SqlitePool, + + /// We make the difference between connections for read operations, and for + /// write operations. We keep a single connection apart from write + /// operations. All other connections are used for read operations. The + /// lock is used to ensure there is one owner at a time. + write_connection: Arc>, + media_service: MediaService, } @@ -114,7 +127,12 @@ impl SqliteEventCacheStore { } /// Open the SQLite-based event cache store with the config open config. + #[instrument(skip(config), fields(path = ?config.path))] pub async fn open_with_config(config: SqliteStoreConfig) -> Result { + debug!(?config); + + let _timer = timer!("open_with_config"); + let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config; fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?; @@ -125,7 +143,7 @@ impl SqliteEventCacheStore { let pool = config.create_pool(Runtime::Tokio1)?; let this = Self::open_with_pool(pool, passphrase.as_deref()).await?; - this.pool.get().await?.apply_runtime_config(runtime_config).await?; + this.write().await?.apply_runtime_config(runtime_config).await?; Ok(this) } @@ -151,10 +169,21 @@ impl SqliteEventCacheStore { let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?; media_service.restore(media_retention_policy, last_media_cleanup_time); - Ok(Self { store_cipher, pool, media_service }) + Ok(Self { + store_cipher, + pool, + // Use `conn` as our selected write connections. + write_connection: Arc::new(Mutex::new(conn)), + media_service, + }) } - async fn acquire(&self) -> Result { + // Acquire a connection for executing read operations. + #[instrument(skip_all)] + async fn read(&self) -> Result { + trace!("Taking a `read` connection"); + let _timer = timer!("connection"); + let connection = self.pool.get().await?; // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key @@ -166,6 +195,23 @@ impl SqliteEventCacheStore { Ok(connection) } + // Acquire a connection for executing write operations. + #[instrument(skip_all)] + async fn write(&self) -> Result> { + trace!("Taking a `write` connection"); + let _timer = timer!("connection"); + + let connection = self.write_connection.clone().lock_owned().await; + + // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key + // support must be enabled on a per-connection basis. Execute it every + // time we try to get a connection, since we can't guarantee a previous + // connection did enable it before. + connection.execute_batch("PRAGMA foreign_keys = ON;").await?; + + Ok(connection) + } + fn map_row_to_chunk( row: &rusqlite::Row<'_>, ) -> Result<(u64, Option, Option, String), rusqlite::Error> { @@ -412,12 +458,15 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> { impl EventCacheStore for SqliteEventCacheStore { type Error = Error; + #[instrument(skip(self))] async fn try_take_leased_lock( &self, lease_duration_ms: u32, key: &str, holder: &str, ) -> Result { + let _timer = timer!("method"); + let key = key.to_owned(); let holder = holder.to_owned(); @@ -425,7 +474,7 @@ impl EventCacheStore for SqliteEventCacheStore { let expiration = now + lease_duration_ms as u64; let num_touched = self - .acquire() + .write() .await? .with_transaction(move |txn| { txn.execute( @@ -445,11 +494,14 @@ impl EventCacheStore for SqliteEventCacheStore { Ok(num_touched == 1) } + #[instrument(skip(self, updates))] async fn handle_linked_chunk_updates( &self, linked_chunk_id: LinkedChunkId<'_>, updates: Vec>, ) -> Result<(), Self::Error> { + let _timer = timer!("method"); + // Use a single transaction throughout this function, so that either all updates // work, or none is taken into account. let hashed_linked_chunk_id = @@ -457,7 +509,7 @@ impl EventCacheStore for SqliteEventCacheStore { let linked_chunk_id = linked_chunk_id.to_owned(); let this = self.clone(); - with_immediate_transaction(self.acquire().await?, move |txn| { + with_immediate_transaction(self, move |txn| { for up in updates { match up { Update::NewItemsChunk { previous, new, next } => { @@ -773,17 +825,20 @@ impl EventCacheStore for SqliteEventCacheStore { Ok(()) } + #[instrument(skip(self))] async fn load_all_chunks( &self, linked_chunk_id: LinkedChunkId<'_>, ) -> Result>, Self::Error> { + let _timer = timer!("method"); + let hashed_linked_chunk_id = self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()); let this = self.clone(); let result = self - .acquire() + .read() .await? .with_transaction(move |txn| -> Result<_> { let mut items = Vec::new(); @@ -814,14 +869,17 @@ impl EventCacheStore for SqliteEventCacheStore { Ok(result) } + #[instrument(skip(self))] async fn load_all_chunks_metadata( &self, linked_chunk_id: LinkedChunkId<'_>, ) -> Result, Self::Error> { + let _timer = timer!("method"); + let hashed_linked_chunk_id = self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()); - self.acquire() + self.read() .await? .with_transaction(move |txn| -> Result<_> { // I'm not a DB analyst, so for my own future sanity: this query joins the @@ -874,17 +932,20 @@ impl EventCacheStore for SqliteEventCacheStore { .await } + #[instrument(skip(self))] async fn load_last_chunk( &self, linked_chunk_id: LinkedChunkId<'_>, ) -> Result<(Option>, ChunkIdentifierGenerator), Self::Error> { + let _timer = timer!("method"); + let hashed_linked_chunk_id = self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()); let this = self.clone(); self - .acquire() + .read() .await? .with_transaction(move |txn| -> Result<_> { // Find the latest chunk identifier to generate a `ChunkIdentifierGenerator`, and count the number of chunks. @@ -966,18 +1027,21 @@ impl EventCacheStore for SqliteEventCacheStore { .await } + #[instrument(skip(self))] async fn load_previous_chunk( &self, linked_chunk_id: LinkedChunkId<'_>, before_chunk_identifier: ChunkIdentifier, ) -> Result>, Self::Error> { + let _timer = timer!("method"); + let hashed_linked_chunk_id = self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()); let this = self.clone(); self - .acquire() + .read() .await? .with_transaction(move |txn| -> Result<_> { // Find the chunk before the chunk identified by `before_chunk_identifier`. @@ -1017,8 +1081,11 @@ impl EventCacheStore for SqliteEventCacheStore { .await } + #[instrument(skip(self))] async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> { - self.acquire() + let _timer = timer!("method"); + + self.write() .await? .with_transaction(move |txn| { // Remove all the chunks, and let cascading do its job. @@ -1027,14 +1094,18 @@ impl EventCacheStore for SqliteEventCacheStore { txn.execute("DELETE FROM events", ()) }) .await?; + Ok(()) } + #[instrument(skip(self, events))] async fn filter_duplicated_events( &self, linked_chunk_id: LinkedChunkId<'_>, events: Vec, ) -> Result, Self::Error> { + let _timer = timer!("method"); + // If there's no events for which we want to check duplicates, we can return // early. It's not only an optimization to do so: it's required, otherwise the // `repeat_vars` call below will panic. @@ -1047,7 +1118,7 @@ impl EventCacheStore for SqliteEventCacheStore { self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()); let linked_chunk_id = linked_chunk_id.to_owned(); - self.acquire() + self.read() .await? .with_transaction(move |txn| -> Result<_> { txn.chunk_large_query_over(events, None, move |txn, events| { @@ -1109,17 +1180,20 @@ impl EventCacheStore for SqliteEventCacheStore { .await } + #[instrument(skip(self, event_id))] async fn find_event( &self, room_id: &RoomId, event_id: &EventId, ) -> Result, Self::Error> { + let _timer = timer!("method"); + let event_id = event_id.to_owned(); let this = self.clone(); let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id); - self.acquire() + self.read() .await? .with_transaction(move |txn| -> Result<_> { let Some(event) = txn @@ -1138,12 +1212,15 @@ impl EventCacheStore for SqliteEventCacheStore { .await } + #[instrument(skip(self, event_id, filters))] async fn find_event_relations( &self, room_id: &RoomId, event_id: &EventId, filters: Option<&[RelationType]>, ) -> Result)>, Self::Error> { + let _timer = timer!("method"); + let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id); let hashed_linked_chunk_id = @@ -1153,7 +1230,7 @@ impl EventCacheStore for SqliteEventCacheStore { let filters = filters.map(ToOwned::to_owned); let this = self.clone(); - self.acquire() + self.read() .await? .with_transaction(move |txn| -> Result<_> { let filter_query = if let Some(filters) = compute_filters_string(filters.as_deref()) @@ -1206,7 +1283,10 @@ impl EventCacheStore for SqliteEventCacheStore { .await } + #[instrument(skip(self, event))] async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> { + let _timer = timer!("method"); + let Some(event_id) = event.event_id() else { error!(%room_id, "Trying to save an event with no ID"); return Ok(()); @@ -1216,7 +1296,7 @@ impl EventCacheStore for SqliteEventCacheStore { let event_id = event_id.to_string(); let encoded_event = self.encode_event(&event)?; - self.acquire() + self.write() .await? .with_transaction(move |txn| -> Result<_> { txn.execute( @@ -1228,27 +1308,33 @@ impl EventCacheStore for SqliteEventCacheStore { .await } + #[instrument(skip_all)] async fn add_media_content( &self, request: &MediaRequestParameters, content: Vec, ignore_policy: IgnoreMediaRetentionPolicy, ) -> Result<()> { + let _timer = timer!("method"); + self.media_service.add_media_content(self, request, content, ignore_policy).await } + #[instrument(skip_all)] async fn replace_media_key( &self, from: &MediaRequestParameters, to: &MediaRequestParameters, ) -> Result<(), Self::Error> { + let _timer = timer!("method"); + let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key()); let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key()); let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key()); let new_format = self.encode_key(keys::MEDIA, to.format.unique_key()); - let conn = self.acquire().await?; + let conn = self.write().await?; conn.execute( r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#, (new_uri, new_format, prev_uri, prev_format), @@ -1258,56 +1344,80 @@ impl EventCacheStore for SqliteEventCacheStore { Ok(()) } + #[instrument(skip_all)] async fn get_media_content(&self, request: &MediaRequestParameters) -> Result>> { + let _timer = timer!("method"); + self.media_service.get_media_content(self, request).await } + #[instrument(skip_all)] async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> { + let _timer = timer!("method"); + let uri = self.encode_key(keys::MEDIA, request.source.unique_key()); let format = self.encode_key(keys::MEDIA, request.format.unique_key()); - let conn = self.acquire().await?; + let conn = self.write().await?; conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?; Ok(()) } + #[instrument(skip(self))] async fn get_media_content_for_uri( &self, uri: &MxcUri, ) -> Result>, Self::Error> { + let _timer = timer!("method"); + self.media_service.get_media_content_for_uri(self, uri).await } + #[instrument(skip(self))] async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> { + let _timer = timer!("method"); + let uri = self.encode_key(keys::MEDIA, uri); - let conn = self.acquire().await?; + let conn = self.write().await?; conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?; Ok(()) } + #[instrument(skip_all)] async fn set_media_retention_policy( &self, policy: MediaRetentionPolicy, ) -> Result<(), Self::Error> { + let _timer = timer!("method"); + self.media_service.set_media_retention_policy(self, policy).await } + #[instrument(skip_all)] fn media_retention_policy(&self) -> MediaRetentionPolicy { + let _timer = timer!("method"); + self.media_service.media_retention_policy() } + #[instrument(skip_all)] async fn set_ignore_media_retention_policy( &self, request: &MediaRequestParameters, ignore_policy: IgnoreMediaRetentionPolicy, ) -> Result<(), Self::Error> { + let _timer = timer!("method"); + self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await } + #[instrument(skip_all)] async fn clean_up_media_cache(&self) -> Result<(), Self::Error> { + let _timer = timer!("method"); + self.media_service.clean_up_media_cache(self).await } } @@ -1320,7 +1430,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore { async fn media_retention_policy_inner( &self, ) -> Result, Self::Error> { - let conn = self.acquire().await?; + let conn = self.read().await?; conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await } @@ -1328,7 +1438,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore { &self, policy: MediaRetentionPolicy, ) -> Result<(), Self::Error> { - let conn = self.acquire().await?; + let conn = self.write().await?; conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?; Ok(()) } @@ -1352,7 +1462,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore { let format = self.encode_key(keys::MEDIA, request.format.unique_key()); let timestamp = time_to_timestamp(last_access); - let conn = self.acquire().await?; + let conn = self.write().await?; conn.execute( "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)", (uri, format, data, timestamp, ignore_policy), @@ -1371,7 +1481,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore { let format = self.encode_key(keys::MEDIA, request.format.unique_key()); let ignore_policy = ignore_policy.is_yes(); - let conn = self.acquire().await?; + let conn = self.write().await?; conn.execute( r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#, (ignore_policy, uri, format), @@ -1390,7 +1500,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore { let format = self.encode_key(keys::MEDIA, request.format.unique_key()); let timestamp = time_to_timestamp(current_time); - let conn = self.acquire().await?; + let conn = self.write().await?; let data = conn .with_transaction::<_, rusqlite::Error, _>(move |txn| { // Update the last access. @@ -1421,7 +1531,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore { let uri = self.encode_key(keys::MEDIA, uri); let timestamp = time_to_timestamp(current_time); - let conn = self.acquire().await?; + let conn = self.write().await?; let data = conn .with_transaction::<_, rusqlite::Error, _>(move |txn| { // Update the last access. @@ -1451,7 +1561,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore { return Ok(()); } - let conn = self.acquire().await?; + let conn = self.write().await?; let removed = conn .with_transaction::<_, Error, _>(move |txn| { let mut removed = false; @@ -1570,7 +1680,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore { } async fn last_media_cleanup_time_inner(&self) -> Result, Self::Error> { - let conn = self.acquire().await?; + let conn = self.read().await?; conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await } } @@ -1583,33 +1693,35 @@ async fn with_immediate_transaction< T: Send + 'static, F: FnOnce(&Transaction<'_>) -> Result + Send + 'static, >( - conn: SqliteAsyncConn, + this: &SqliteEventCacheStore, f: F, ) -> Result { - conn.interact(move |conn| -> Result { - // Start the transaction in IMMEDIATE mode since all updates may cause writes, - // to avoid read transactions upgrading to write mode and causing - // SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions - conn.set_transaction_behavior(TransactionBehavior::Immediate); - - let code = || -> Result { - let txn = conn.transaction()?; - let res = f(&txn)?; - txn.commit()?; - Ok(res) - }; - - let res = code(); - - // Reset the transaction behavior to use Deferred, after this transaction has - // been run, whether it was successful or not. - conn.set_transaction_behavior(TransactionBehavior::Deferred); - - res - }) - .await - // SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].` - .unwrap() + this.write() + .await? + .interact(move |conn| -> Result { + // Start the transaction in IMMEDIATE mode since all updates may cause writes, + // to avoid read transactions upgrading to write mode and causing + // SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions + conn.set_transaction_behavior(TransactionBehavior::Immediate); + + let code = || -> Result { + let txn = conn.transaction()?; + let res = f(&txn)?; + txn.commit()?; + Ok(res) + }; + + let res = code(); + + // Reset the transaction behavior to use Deferred, after this transaction has + // been run, whether it was successful or not. + conn.set_transaction_behavior(TransactionBehavior::Deferred); + + res + }) + .await + // SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].` + .unwrap() } fn insert_chunk( @@ -1716,7 +1828,7 @@ mod tests { async fn get_event_cache_store_content_sorted_by_last_access( event_cache_store: &SqliteEventCacheStore, ) -> Vec> { - let sqlite_db = event_cache_store.acquire().await.expect("accessing sqlite db failed"); + let sqlite_db = event_cache_store.read().await.expect("accessing sqlite db failed"); sqlite_db .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| { stmt.query(())?.mapped(|row| row.get(0)).collect() @@ -2006,7 +2118,7 @@ mod tests { // Check that cascading worked. Yes, SQLite, I doubt you. let gaps = store - .acquire() + .read() .await .unwrap() .with_transaction(|txn| -> rusqlite::Result<_> { @@ -2128,7 +2240,7 @@ mod tests { // Make sure the position have been updated for the remaining events. let num_rows: u64 = store - .acquire() + .read() .await .unwrap() .with_transaction(move |txn| { @@ -2277,7 +2389,7 @@ mod tests { // Check that cascading worked. Yes, SQLite, I doubt you. store - .acquire() + .read() .await .unwrap() .with_transaction(|txn| -> rusqlite::Result<_> { diff --git a/crates/matrix-sdk-sqlite/src/lib.rs b/crates/matrix-sdk-sqlite/src/lib.rs index a8ab210d121..7130e15039e 100644 --- a/crates/matrix-sdk-sqlite/src/lib.rs +++ b/crates/matrix-sdk-sqlite/src/lib.rs @@ -25,6 +25,7 @@ mod event_cache_store; mod state_store; mod utils; use std::{ + cmp::max, fmt, path::{Path, PathBuf}, }; @@ -66,6 +67,12 @@ impl fmt::Debug for SqliteStoreConfig { } } +/// The minimum size of the connections pool. +/// +/// We need at least 2 connections: one connection for write operations, and one +/// connection for read operations. +const POOL_MINIMUM_SIZE: usize = 2; + impl SqliteStoreConfig { /// Create a new [`SqliteStoreConfig`] with a path representing the /// directory containing the store database. @@ -76,7 +83,7 @@ impl SqliteStoreConfig { Self { path: path.as_ref().to_path_buf(), passphrase: None, - pool_config: PoolConfig::new(num_cpus::get_physical() * 4), + pool_config: PoolConfig::new(max(POOL_MINIMUM_SIZE, num_cpus::get_physical() * 4)), runtime_config: RuntimeConfig::default(), } } @@ -122,7 +129,7 @@ impl SqliteStoreConfig { /// /// See [`deadpool_sqlite::PoolConfig::max_size`] to learn more. pub fn pool_max_size(mut self, max_size: usize) -> Self { - self.pool_config.max_size = max_size; + self.pool_config.max_size = max(POOL_MINIMUM_SIZE, max_size); self } @@ -218,7 +225,7 @@ mod tests { path::{Path, PathBuf}, }; - use super::SqliteStoreConfig; + use super::{SqliteStoreConfig, POOL_MINIMUM_SIZE}; #[test] fn test_new() { @@ -263,4 +270,11 @@ mod tests { assert_eq!(store_config.path, PathBuf::from("bar")); } + + #[test] + fn test_pool_size_has_a_minimum() { + let store_config = SqliteStoreConfig::new(Path::new("foo")).pool_max_size(1); + + assert_eq!(store_config.pool_config.max_size, POOL_MINIMUM_SIZE); + } }