Skip to content

Commit e854a56

Browse files
committed
feat(sqlite): SqliteEventCacheStore has 1 write connection.
Until now, `SqliteEventCacheStore` manages a pool of connections. A connection is fetched from this pool and operations are executed on it, regardless whether these are read operations or write operations. We are seeing more and more _database is busy_ errors. We believe this is because too many write operations are executed concurrently. The solution to solve this is to use multiple connections for read operations, and a single connection for write operations. That way, concurrent writings are no longer a thing, and we hope it will reduce the number of _database is busy_ errors to zero. That's our guess. This patch does that. When the pool of connections is created, a connection is elected as the `write_connection`. To get a connection for read operations, one has to use the new `SqliteEventCacheStore::read` method (it replaces the `acquire` method). To get a connection for write operations, one has to use the new `SQliteEventCacheStore::write` method. It returns a `OwnedMutexGuard` from an async `Mutex`. All callers that want to do write operations on this store have to wait their turn, this `Mutex` is fair, and the first to wait on the lock is the first that will take the lock (FIFO). It guarantees the execution ordering the code expects. The rest of the patch updates all spots where `acquire` was used and replaces them by `read()` or `write()`. A particular care was made to see if other places are using `SqliteEventCacheStore::pool` directly. No place remains except in `read()` and `write()`.
1 parent 01641cb commit e854a56

File tree

1 file changed

+89
-55
lines changed

1 file changed

+89
-55
lines changed

crates/matrix-sdk-sqlite/src/event_cache_store.rs

Lines changed: 89 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ use ruma::{
4343
OwnedEventId, RoomId,
4444
};
4545
use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
46-
use tokio::fs;
46+
use tokio::{
47+
fs,
48+
sync::{Mutex, OwnedMutexGuard},
49+
};
4750
use tracing::{debug, error, trace};
4851

4952
use crate::{
@@ -86,7 +89,16 @@ const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
8689
#[derive(Clone)]
8790
pub struct SqliteEventCacheStore {
8891
store_cipher: Option<Arc<StoreCipher>>,
92+
93+
/// The pool of connections.
8994
pool: SqlitePool,
95+
96+
/// We make the difference between connections for read operations, and for
97+
/// write operations. We keep a single connection apart from write
98+
/// operations. All other connections are used for read operations. The
99+
/// lock is used to ensure there is one owner at a time.
100+
write_connection: Arc<Mutex<SqliteAsyncConn>>,
101+
90102
media_service: MediaService,
91103
}
92104

@@ -125,7 +137,7 @@ impl SqliteEventCacheStore {
125137
let pool = config.create_pool(Runtime::Tokio1)?;
126138

127139
let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
128-
this.pool.get().await?.apply_runtime_config(runtime_config).await?;
140+
this.write().await?.apply_runtime_config(runtime_config).await?;
129141

130142
Ok(this)
131143
}
@@ -151,10 +163,17 @@ impl SqliteEventCacheStore {
151163
let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?;
152164
media_service.restore(media_retention_policy, last_media_cleanup_time);
153165

154-
Ok(Self { store_cipher, pool, media_service })
166+
Ok(Self {
167+
store_cipher,
168+
pool,
169+
// Use `conn` as our selected write connections.
170+
write_connection: Arc::new(Mutex::new(conn)),
171+
media_service,
172+
})
155173
}
156174

157-
async fn acquire(&self) -> Result<SqliteAsyncConn> {
175+
// Acquire a connection for executing read operations.
176+
async fn read(&self) -> Result<SqliteAsyncConn> {
158177
let connection = self.pool.get().await?;
159178

160179
// Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
@@ -166,6 +185,19 @@ impl SqliteEventCacheStore {
166185
Ok(connection)
167186
}
168187

188+
// Acquire a connection for executing write operations.
189+
async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
190+
let connection = self.write_connection.clone().lock_owned().await;
191+
192+
// Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
193+
// support must be enabled on a per-connection basis. Execute it every
194+
// time we try to get a connection, since we can't guarantee a previous
195+
// connection did enable it before.
196+
connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
197+
198+
Ok(connection)
199+
}
200+
169201
fn map_row_to_chunk(
170202
row: &rusqlite::Row<'_>,
171203
) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
@@ -425,7 +457,7 @@ impl EventCacheStore for SqliteEventCacheStore {
425457
let expiration = now + lease_duration_ms as u64;
426458

427459
let num_touched = self
428-
.acquire()
460+
.write()
429461
.await?
430462
.with_transaction(move |txn| {
431463
txn.execute(
@@ -457,7 +489,7 @@ impl EventCacheStore for SqliteEventCacheStore {
457489
let linked_chunk_id = linked_chunk_id.to_owned();
458490
let this = self.clone();
459491

460-
with_immediate_transaction(self.acquire().await?, move |txn| {
492+
with_immediate_transaction(self, move |txn| {
461493
for up in updates {
462494
match up {
463495
Update::NewItemsChunk { previous, new, next } => {
@@ -783,7 +815,7 @@ impl EventCacheStore for SqliteEventCacheStore {
783815
let this = self.clone();
784816

785817
let result = self
786-
.acquire()
818+
.read()
787819
.await?
788820
.with_transaction(move |txn| -> Result<_> {
789821
let mut items = Vec::new();
@@ -821,7 +853,7 @@ impl EventCacheStore for SqliteEventCacheStore {
821853
let hashed_linked_chunk_id =
822854
self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
823855

824-
self.acquire()
856+
self.read()
825857
.await?
826858
.with_transaction(move |txn| -> Result<_> {
827859
// I'm not a DB analyst, so for my own future sanity: this query joins the
@@ -884,7 +916,7 @@ impl EventCacheStore for SqliteEventCacheStore {
884916
let this = self.clone();
885917

886918
self
887-
.acquire()
919+
.read()
888920
.await?
889921
.with_transaction(move |txn| -> Result<_> {
890922
// Find the latest chunk identifier to generate a `ChunkIdentifierGenerator`, and count the number of chunks.
@@ -977,7 +1009,7 @@ impl EventCacheStore for SqliteEventCacheStore {
9771009
let this = self.clone();
9781010

9791011
self
980-
.acquire()
1012+
.read()
9811013
.await?
9821014
.with_transaction(move |txn| -> Result<_> {
9831015
// Find the chunk before the chunk identified by `before_chunk_identifier`.
@@ -1018,7 +1050,7 @@ impl EventCacheStore for SqliteEventCacheStore {
10181050
}
10191051

10201052
async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1021-
self.acquire()
1053+
self.write()
10221054
.await?
10231055
.with_transaction(move |txn| {
10241056
// Remove all the chunks, and let cascading do its job.
@@ -1047,7 +1079,7 @@ impl EventCacheStore for SqliteEventCacheStore {
10471079
self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
10481080
let linked_chunk_id = linked_chunk_id.to_owned();
10491081

1050-
self.acquire()
1082+
self.read()
10511083
.await?
10521084
.with_transaction(move |txn| -> Result<_> {
10531085
txn.chunk_large_query_over(events, None, move |txn, events| {
@@ -1119,7 +1151,7 @@ impl EventCacheStore for SqliteEventCacheStore {
11191151

11201152
let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
11211153

1122-
self.acquire()
1154+
self.read()
11231155
.await?
11241156
.with_transaction(move |txn| -> Result<_> {
11251157
let Some(event) = txn
@@ -1153,7 +1185,7 @@ impl EventCacheStore for SqliteEventCacheStore {
11531185
let filters = filters.map(ToOwned::to_owned);
11541186
let this = self.clone();
11551187

1156-
self.acquire()
1188+
self.read()
11571189
.await?
11581190
.with_transaction(move |txn| -> Result<_> {
11591191
let filter_query = if let Some(filters) = compute_filters_string(filters.as_deref())
@@ -1216,7 +1248,7 @@ impl EventCacheStore for SqliteEventCacheStore {
12161248
let event_id = event_id.to_string();
12171249
let encoded_event = self.encode_event(&event)?;
12181250

1219-
self.acquire()
1251+
self.write()
12201252
.await?
12211253
.with_transaction(move |txn| -> Result<_> {
12221254
txn.execute(
@@ -1248,7 +1280,7 @@ impl EventCacheStore for SqliteEventCacheStore {
12481280
let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key());
12491281
let new_format = self.encode_key(keys::MEDIA, to.format.unique_key());
12501282

1251-
let conn = self.acquire().await?;
1283+
let conn = self.write().await?;
12521284
conn.execute(
12531285
r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#,
12541286
(new_uri, new_format, prev_uri, prev_format),
@@ -1266,7 +1298,7 @@ impl EventCacheStore for SqliteEventCacheStore {
12661298
let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
12671299
let format = self.encode_key(keys::MEDIA, request.format.unique_key());
12681300

1269-
let conn = self.acquire().await?;
1301+
let conn = self.write().await?;
12701302
conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?;
12711303

12721304
Ok(())
@@ -1282,7 +1314,7 @@ impl EventCacheStore for SqliteEventCacheStore {
12821314
async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
12831315
let uri = self.encode_key(keys::MEDIA, uri);
12841316

1285-
let conn = self.acquire().await?;
1317+
let conn = self.write().await?;
12861318
conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
12871319

12881320
Ok(())
@@ -1320,15 +1352,15 @@ impl EventCacheStoreMedia for SqliteEventCacheStore {
13201352
async fn media_retention_policy_inner(
13211353
&self,
13221354
) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
1323-
let conn = self.acquire().await?;
1355+
let conn = self.read().await?;
13241356
conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await
13251357
}
13261358

13271359
async fn set_media_retention_policy_inner(
13281360
&self,
13291361
policy: MediaRetentionPolicy,
13301362
) -> Result<(), Self::Error> {
1331-
let conn = self.acquire().await?;
1363+
let conn = self.write().await?;
13321364
conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?;
13331365
Ok(())
13341366
}
@@ -1352,7 +1384,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore {
13521384
let format = self.encode_key(keys::MEDIA, request.format.unique_key());
13531385
let timestamp = time_to_timestamp(last_access);
13541386

1355-
let conn = self.acquire().await?;
1387+
let conn = self.write().await?;
13561388
conn.execute(
13571389
"INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)",
13581390
(uri, format, data, timestamp, ignore_policy),
@@ -1371,7 +1403,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore {
13711403
let format = self.encode_key(keys::MEDIA, request.format.unique_key());
13721404
let ignore_policy = ignore_policy.is_yes();
13731405

1374-
let conn = self.acquire().await?;
1406+
let conn = self.write().await?;
13751407
conn.execute(
13761408
r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#,
13771409
(ignore_policy, uri, format),
@@ -1390,7 +1422,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore {
13901422
let format = self.encode_key(keys::MEDIA, request.format.unique_key());
13911423
let timestamp = time_to_timestamp(current_time);
13921424

1393-
let conn = self.acquire().await?;
1425+
let conn = self.write().await?;
13941426
let data = conn
13951427
.with_transaction::<_, rusqlite::Error, _>(move |txn| {
13961428
// Update the last access.
@@ -1421,7 +1453,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore {
14211453
let uri = self.encode_key(keys::MEDIA, uri);
14221454
let timestamp = time_to_timestamp(current_time);
14231455

1424-
let conn = self.acquire().await?;
1456+
let conn = self.write().await?;
14251457
let data = conn
14261458
.with_transaction::<_, rusqlite::Error, _>(move |txn| {
14271459
// Update the last access.
@@ -1451,7 +1483,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore {
14511483
return Ok(());
14521484
}
14531485

1454-
let conn = self.acquire().await?;
1486+
let conn = self.write().await?;
14551487
let removed = conn
14561488
.with_transaction::<_, Error, _>(move |txn| {
14571489
let mut removed = false;
@@ -1570,7 +1602,7 @@ impl EventCacheStoreMedia for SqliteEventCacheStore {
15701602
}
15711603

15721604
async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
1573-
let conn = self.acquire().await?;
1605+
let conn = self.read().await?;
15741606
conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await
15751607
}
15761608
}
@@ -1583,33 +1615,35 @@ async fn with_immediate_transaction<
15831615
T: Send + 'static,
15841616
F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
15851617
>(
1586-
conn: SqliteAsyncConn,
1618+
this: &SqliteEventCacheStore,
15871619
f: F,
15881620
) -> Result<T, Error> {
1589-
conn.interact(move |conn| -> Result<T, Error> {
1590-
// Start the transaction in IMMEDIATE mode since all updates may cause writes,
1591-
// to avoid read transactions upgrading to write mode and causing
1592-
// SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
1593-
conn.set_transaction_behavior(TransactionBehavior::Immediate);
1594-
1595-
let code = || -> Result<T, Error> {
1596-
let txn = conn.transaction()?;
1597-
let res = f(&txn)?;
1598-
txn.commit()?;
1599-
Ok(res)
1600-
};
1601-
1602-
let res = code();
1603-
1604-
// Reset the transaction behavior to use Deferred, after this transaction has
1605-
// been run, whether it was successful or not.
1606-
conn.set_transaction_behavior(TransactionBehavior::Deferred);
1607-
1608-
res
1609-
})
1610-
.await
1611-
// SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].`
1612-
.unwrap()
1621+
this.write()
1622+
.await?
1623+
.interact(move |conn| -> Result<T, Error> {
1624+
// Start the transaction in IMMEDIATE mode since all updates may cause writes,
1625+
// to avoid read transactions upgrading to write mode and causing
1626+
// SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
1627+
conn.set_transaction_behavior(TransactionBehavior::Immediate);
1628+
1629+
let code = || -> Result<T, Error> {
1630+
let txn = conn.transaction()?;
1631+
let res = f(&txn)?;
1632+
txn.commit()?;
1633+
Ok(res)
1634+
};
1635+
1636+
let res = code();
1637+
1638+
// Reset the transaction behavior to use Deferred, after this transaction has
1639+
// been run, whether it was successful or not.
1640+
conn.set_transaction_behavior(TransactionBehavior::Deferred);
1641+
1642+
res
1643+
})
1644+
.await
1645+
// SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].`
1646+
.unwrap()
16131647
}
16141648

16151649
fn insert_chunk(
@@ -1716,7 +1750,7 @@ mod tests {
17161750
async fn get_event_cache_store_content_sorted_by_last_access(
17171751
event_cache_store: &SqliteEventCacheStore,
17181752
) -> Vec<Vec<u8>> {
1719-
let sqlite_db = event_cache_store.acquire().await.expect("accessing sqlite db failed");
1753+
let sqlite_db = event_cache_store.read().await.expect("accessing sqlite db failed");
17201754
sqlite_db
17211755
.prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| {
17221756
stmt.query(())?.mapped(|row| row.get(0)).collect()
@@ -2006,7 +2040,7 @@ mod tests {
20062040

20072041
// Check that cascading worked. Yes, SQLite, I doubt you.
20082042
let gaps = store
2009-
.acquire()
2043+
.read()
20102044
.await
20112045
.unwrap()
20122046
.with_transaction(|txn| -> rusqlite::Result<_> {
@@ -2128,7 +2162,7 @@ mod tests {
21282162

21292163
// Make sure the position have been updated for the remaining events.
21302164
let num_rows: u64 = store
2131-
.acquire()
2165+
.read()
21322166
.await
21332167
.unwrap()
21342168
.with_transaction(move |txn| {
@@ -2277,7 +2311,7 @@ mod tests {
22772311

22782312
// Check that cascading worked. Yes, SQLite, I doubt you.
22792313
store
2280-
.acquire()
2314+
.read()
22812315
.await
22822316
.unwrap()
22832317
.with_transaction(|txn| -> rusqlite::Result<_> {

0 commit comments

Comments
 (0)