Skip to content

Commit f03934b

Browse files
committed
feat(sqlite): SqliteEventCacheStore has 1 write connection.
Until now, `SqliteEventCacheStore` manages a pool of connections. A connection is fetched from this pool and operations are executed on it, regardless whether these are read operations or write operations. We are seeing more and more _database is busy_ errors. We believe this is because too many write operations are executed concurrently. The solution to solve this is to use multiple connections for read operations, and a single connection for write operations. That way, concurrent writings are no longer a thing, and we hope it will reduce the number of _database is busy_ errors to zero. That's our guess. This patch does that. When the pool of connections is created, a connection is elected as the `write_connection`. To get a connection for read operations, one has to use the new `SqliteEventCacheStore::read` method (it replaces the `acquire` method). To get a connection for write operations, one has to use the new `SQliteEventCacheStore::write` method. It returns a `OwnedMutexGuard` from an async `Mutex`. All callers that want to do write operations on this store have to wait their turn, this `Mutex` is fair, and the first to wait on the lock is the first that will take the lock (FIFO). It guarantees the execution ordering the code expects. The rest of the patch updates all spots where `acquire` was used and replaces them by `read()` or `write()`. A particular care was made to see if other places are using `SqliteEventCacheStore::pool` directly. No place remains except in `read()` and `write()`.
1 parent 014ee98 commit f03934b

File tree

1 file changed

+89
-55
lines changed

1 file changed

+89
-55
lines changed

crates/matrix-sdk-sqlite/src/event_cache_store.rs

Lines changed: 89 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ use ruma::{
4343
OwnedEventId, RoomId,
4444
};
4545
use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
46-
use tokio::fs;
46+
use tokio::{
47+
fs,
48+
sync::{Mutex, OwnedMutexGuard},
49+
};
4750
use tracing::{debug, error, trace};
4851

4952
use crate::{
@@ -86,7 +89,16 @@ const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
8689
#[derive(Clone)]
8790
pub struct SqliteEventCacheStore {
8891
store_cipher: Option<Arc<StoreCipher>>,
92+
93+
/// The pool of connections.
8994
pool: SqlitePool,
95+
96+
/// We make the difference between connections for read operations, and for
97+
/// write operations. We keep a single connection apart from write
98+
/// operations. All other connections are used for read operations. The
99+
/// lock is used to ensure there is one owner at a time.
100+
write_connection: Arc<Mutex<SqliteAsyncConn>>,
101+
90102
media_service: MediaService,
91103
}
92104

@@ -125,7 +137,7 @@ impl SqliteEventCacheStore {
125137
let pool = config.create_pool(Runtime::Tokio1)?;
126138

127139
let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
128-
this.pool.get().await?.apply_runtime_config(runtime_config).await?;
140+
this.write().await?.apply_runtime_config(runtime_config).await?;
129141

130142
Ok(this)
131143
}
@@ -151,10 +163,17 @@ impl SqliteEventCacheStore {
151163
let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?;
152164
media_service.restore(media_retention_policy, last_media_cleanup_time);
153165

154-
Ok(Self { store_cipher, pool, media_service })
166+
Ok(Self {
167+
store_cipher,
168+
pool,
169+
// Use `conn` as our selected write connections.
170+
write_connection: Arc::new(Mutex::new(conn)),
171+
media_service,
172+
})
155173
}
156174

157-
async fn acquire(&self) -> Result<SqliteAsyncConn> {
175+
// Acquire a connection for executing read operations.
176+
async fn read(&self) -> Result<SqliteAsyncConn> {
158177
let connection = self.pool.get().await?;
159178

160179
// Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
@@ -166,6 +185,19 @@ impl SqliteEventCacheStore {
166185
Ok(connection)
167186
}
168187

188+
// Acquire a connection for executing write operations.
189+
async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
190+
let connection = self.write_connection.clone().lock_owned().await;
191+
192+
// Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
193+
// support must be enabled on a per-connection basis. Execute it every
194+
// time we try to get a connection, since we can't guarantee a previous
195+
// connection did enable it before.
196+
connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
197+
198+
Ok(connection)
199+
}
200+
169201
fn map_row_to_chunk(
170202
row: &rusqlite::Row<'_>,
171203
) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
@@ -425,7 +457,7 @@ impl EventCacheStore for SqliteEventCacheStore {
425457
let expiration = now + lease_duration_ms as u64;
426458

427459
let num_touched = self
428-
.acquire()
460+
.write()
429461
.await?
430462
.with_transaction(move |txn| {
431463
txn.execute(
@@ -457,7 +489,7 @@ impl EventCacheStore for SqliteEventCacheStore {
457489
let linked_chunk_id = linked_chunk_id.to_owned();
458490
let this = self.clone();
459491

460-
with_immediate_transaction(self.acquire().await?, move |txn| {
492+
with_immediate_transaction(self, move |txn| {
461493
for up in updates {
462494
match up {
463495
Update::NewItemsChunk { previous, new, next } => {
@@ -783,7 +815,7 @@ impl EventCacheStore for SqliteEventCacheStore {
783815
let this = self.clone();
784816

785817
let result = self
786-
.acquire()
818+
.read()
787819
.await?
788820
.with_transaction(move |txn| -> Result<_> {
789821
let mut items = Vec::new();
@@ -821,7 +853,7 @@ impl EventCacheStore for SqliteEventCacheStore {
821853
let hashed_linked_chunk_id =
822854
self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
823855

824-
self.acquire()
856+
self.read()
825857
.await?
826858
.with_transaction(move |txn| -> Result<_> {
827859
// I'm not a DB analyst, so for my own future sanity: this query joins the
@@ -884,7 +916,7 @@ impl EventCacheStore for SqliteEventCacheStore {
884916
let this = self.clone();
885917

886918
self
887-
.acquire()
919+
.read()
888920
.await?
889921
.with_transaction(move |txn| -> Result<_> {
890922
// Find the latest chunk identifier to generate a `ChunkIdentifierGenerator`, and count the number of chunks.
@@ -977,7 +1009,7 @@ impl EventCacheStore for SqliteEventCacheStore {
9771009
let this = self.clone();
9781010

9791011
self
980-
.acquire()
1012+
.read()
9811013
.await?
9821014
.with_transaction(move |txn| -> Result<_> {
9831015
// Find the chunk before the chunk identified by `before_chunk_identifier`.
@@ -1018,7 +1050,7 @@ impl EventCacheStore for SqliteEventCacheStore {
10181050
}
10191051

10201052
async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1021-
self.acquire()
1053+
self.write()
10221054
.await?
10231055
.with_transaction(move |txn| {
10241056
// Remove all the chunks, and let cascading do its job.
@@ -1047,7 +1079,7 @@ impl EventCacheStore for SqliteEventCacheStore {
10471079
self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
10481080
let linked_chunk_id = linked_chunk_id.to_owned();
10491081

1050-
self.acquire()
1082+
self.read()
10511083
.await?
10521084
.with_transaction(move |txn| -> Result<_> {
10531085
txn.chunk_large_query_over(events, None, move |txn, events| {
@@ -1119,7 +1151,7 @@ impl EventCacheStore for SqliteEventCacheStore {
11191151

11201152
let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
11211153

1122-
self.acquire()
1154+
self.read()
11231155
.await?
11241156
.with_transaction(move |txn| -> Result<_> {
11251157
let Some(event) = txn
@@ -1153,7 +1185,7 @@ impl EventCacheStore for SqliteEventCacheStore {
11531185
let filters = filters.map(ToOwned::to_owned);
11541186
let store = self.clone();
11551187

1156-
self.acquire()
1188+
self.read()
11571189
.await?
11581190
.with_transaction(move |txn| -> Result<_> {
11591191
find_event_relations_transaction(
@@ -1178,7 +1210,7 @@ impl EventCacheStore for SqliteEventCacheStore {
11781210
let event_id = event_id.to_string();
11791211
let encoded_event = self.encode_event(&event)?;
11801212

1181-
self.acquire()
1213+
self.write()
11821214
.await?
11831215
.with_transaction(move |txn| -> Result<_> {
11841216
txn.execute(
@@ -1210,7 +1242,7 @@ impl EventCacheStore for SqliteEventCacheStore {
12101242
let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key());
12111243
let new_format = self.encode_key(keys::MEDIA, to.format.unique_key());
12121244

1213-
let conn = self.acquire().await?;
1245+
let conn = self.write().await?;
12141246
conn.execute(
12151247
r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#,
12161248
(new_uri, new_format, prev_uri, prev_format),
@@ -1228,7 +1260,7 @@ impl EventCacheStore for SqliteEventCacheStore {
12281260
let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
12291261
let format = self.encode_key(keys::MEDIA, request.format.unique_key());
12301262

1231-
let conn = self.acquire().await?;
1263+
let conn = self.write().await?;
12321264
conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?;
12331265

12341266
Ok(())
@@ -1244,7 +1276,7 @@ impl EventCacheStore for SqliteEventCacheStore {
12441276
async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
12451277
let uri = self.encode_key(keys::MEDIA, uri);
12461278

1247-
let conn = self.acquire().await?;
1279+
let conn = self.write().await?;
12481280
conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
12491281

12501282
Ok(())
@@ -1282,15 +1314,15 @@ impl EventCacheStoreMedia for SqliteEventCacheStore {
12821314
async fn media_retention_policy_inner(
12831315
&self,
12841316
) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
1285-
let conn = self.acquire().await?;
1317+
let conn = self.read().await?;
12861318
conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await
12871319
}
12881320

12891321
async fn set_media_retention_policy_inner(
12901322
&self,
12911323
policy: MediaRetentionPolicy,
12921324
) -> Result<(), Self::Error> {
1293-
let conn = self.acquire().await?;
1325+
let conn = self.write().await?;
12941326
conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?;
12951327
Ok(())
12961328
}
@@ -1314,7 +1346,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore {
13141346
let format = self.encode_key(keys::MEDIA, request.format.unique_key());
13151347
let timestamp = time_to_timestamp(last_access);
13161348

1317-
let conn = self.acquire().await?;
1349+
let conn = self.write().await?;
13181350
conn.execute(
13191351
"INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)",
13201352
(uri, format, data, timestamp, ignore_policy),
@@ -1333,7 +1365,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore {
13331365
let format = self.encode_key(keys::MEDIA, request.format.unique_key());
13341366
let ignore_policy = ignore_policy.is_yes();
13351367

1336-
let conn = self.acquire().await?;
1368+
let conn = self.write().await?;
13371369
conn.execute(
13381370
r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#,
13391371
(ignore_policy, uri, format),
@@ -1352,7 +1384,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore {
13521384
let format = self.encode_key(keys::MEDIA, request.format.unique_key());
13531385
let timestamp = time_to_timestamp(current_time);
13541386

1355-
let conn = self.acquire().await?;
1387+
let conn = self.write().await?;
13561388
let data = conn
13571389
.with_transaction::<_, rusqlite::Error, _>(move |txn| {
13581390
// Update the last access.
@@ -1383,7 +1415,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore {
13831415
let uri = self.encode_key(keys::MEDIA, uri);
13841416
let timestamp = time_to_timestamp(current_time);
13851417

1386-
let conn = self.acquire().await?;
1418+
let conn = self.write().await?;
13871419
let data = conn
13881420
.with_transaction::<_, rusqlite::Error, _>(move |txn| {
13891421
// Update the last access.
@@ -1413,7 +1445,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore {
14131445
return Ok(());
14141446
}
14151447

1416-
let conn = self.acquire().await?;
1448+
let conn = self.write().await?;
14171449
let removed = conn
14181450
.with_transaction::<_, Error, _>(move |txn| {
14191451
let mut removed = false;
@@ -1532,7 +1564,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore {
15321564
}
15331565

15341566
async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
1535-
let conn = self.acquire().await?;
1567+
let conn = self.read().await?;
15361568
conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await
15371569
}
15381570
}
@@ -1630,33 +1662,35 @@ async fn with_immediate_transaction<
16301662
T: Send + 'static,
16311663
F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
16321664
>(
1633-
conn: SqliteAsyncConn,
1665+
this: &SqliteEventCacheStore,
16341666
f: F,
16351667
) -> Result<T, Error> {
1636-
conn.interact(move |conn| -> Result<T, Error> {
1637-
// Start the transaction in IMMEDIATE mode since all updates may cause writes,
1638-
// to avoid read transactions upgrading to write mode and causing
1639-
// SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
1640-
conn.set_transaction_behavior(TransactionBehavior::Immediate);
1641-
1642-
let code = || -> Result<T, Error> {
1643-
let txn = conn.transaction()?;
1644-
let res = f(&txn)?;
1645-
txn.commit()?;
1646-
Ok(res)
1647-
};
1648-
1649-
let res = code();
1650-
1651-
// Reset the transaction behavior to use Deferred, after this transaction has
1652-
// been run, whether it was successful or not.
1653-
conn.set_transaction_behavior(TransactionBehavior::Deferred);
1654-
1655-
res
1656-
})
1657-
.await
1658-
// SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].`
1659-
.unwrap()
1668+
this.write()
1669+
.await?
1670+
.interact(move |conn| -> Result<T, Error> {
1671+
// Start the transaction in IMMEDIATE mode since all updates may cause writes,
1672+
// to avoid read transactions upgrading to write mode and causing
1673+
// SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
1674+
conn.set_transaction_behavior(TransactionBehavior::Immediate);
1675+
1676+
let code = || -> Result<T, Error> {
1677+
let txn = conn.transaction()?;
1678+
let res = f(&txn)?;
1679+
txn.commit()?;
1680+
Ok(res)
1681+
};
1682+
1683+
let res = code();
1684+
1685+
// Reset the transaction behavior to use Deferred, after this transaction has
1686+
// been run, whether it was successful or not.
1687+
conn.set_transaction_behavior(TransactionBehavior::Deferred);
1688+
1689+
res
1690+
})
1691+
.await
1692+
// SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].`
1693+
.unwrap()
16601694
}
16611695

16621696
fn insert_chunk(
@@ -1763,7 +1797,7 @@ mod tests {
17631797
async fn get_event_cache_store_content_sorted_by_last_access(
17641798
event_cache_store: &SqliteEventCacheStore,
17651799
) -> Vec<Vec<u8>> {
1766-
let sqlite_db = event_cache_store.acquire().await.expect("accessing sqlite db failed");
1800+
let sqlite_db = event_cache_store.read().await.expect("accessing sqlite db failed");
17671801
sqlite_db
17681802
.prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| {
17691803
stmt.query(())?.mapped(|row| row.get(0)).collect()
@@ -2053,7 +2087,7 @@ mod tests {
20532087

20542088
// Check that cascading worked. Yes, SQLite, I doubt you.
20552089
let gaps = store
2056-
.acquire()
2090+
.read()
20572091
.await
20582092
.unwrap()
20592093
.with_transaction(|txn| -> rusqlite::Result<_> {
@@ -2175,7 +2209,7 @@ mod tests {
21752209

21762210
// Make sure the position have been updated for the remaining events.
21772211
let num_rows: u64 = store
2178-
.acquire()
2212+
.read()
21792213
.await
21802214
.unwrap()
21812215
.with_transaction(move |txn| {
@@ -2324,7 +2358,7 @@ mod tests {
23242358

23252359
// Check that cascading worked. Yes, SQLite, I doubt you.
23262360
store
2327-
.acquire()
2361+
.read()
23282362
.await
23292363
.unwrap()
23302364
.with_transaction(|txn| -> rusqlite::Result<_> {

0 commit comments

Comments
 (0)