Skip to content

Commit 3d228bd

Browse files
committed
feat: Event cache lock has a _generation_ value.
This patch introduces the _generation_ value for the event cache lock: every time the lock is acquired by a new holder, the generation is incremented by 1. This is fundamental to know if the lock has been poisoned or not.
1 parent 21bb85a commit 3d228bd

File tree

9 files changed

+93
-46
lines changed

9 files changed

+93
-46
lines changed

crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -260,57 +260,57 @@ macro_rules! event_cache_store_integration_tests_time {
260260
let store = get_event_cache_store().await.unwrap().into_event_cache_store();
261261

262262
let acquired0 = store.try_take_leased_lock(0, "key", "alice").await.unwrap();
263-
assert!(acquired0);
263+
assert_eq!(acquired0, Some(0)); // first lock generation
264264

265265
// Should extend the lease automatically (same holder).
266266
let acquired2 = store.try_take_leased_lock(300, "key", "alice").await.unwrap();
267-
assert!(acquired2);
267+
assert_eq!(acquired2, Some(0)); // same lock generation
268268

269269
// Should extend the lease automatically (same holder + time is ok).
270270
let acquired3 = store.try_take_leased_lock(300, "key", "alice").await.unwrap();
271-
assert!(acquired3);
271+
assert_eq!(acquired3, Some(0)); // same lock generation
272272

273273
// Another attempt at taking the lock should fail, because it's taken.
274274
let acquired4 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
275-
assert!(!acquired4);
275+
assert!(acquired4.is_none()); // not acquired
276276

277277
// Even if we insist.
278278
let acquired5 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
279-
assert!(!acquired5);
279+
assert!(acquired5.is_none()); // not acquired
280280

281281
// That's a nice test we got here, go take a little nap.
282282
tokio::time::sleep(Duration::from_millis(50)).await;
283283

284284
// Still too early.
285285
let acquired55 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
286-
assert!(!acquired55);
286+
assert!(acquired55.is_none()); // not acquired
287287

288288
// Ok you can take another nap then.
289289
tokio::time::sleep(Duration::from_millis(250)).await;
290290

291291
// At some point, we do get the lock.
292292
let acquired6 = store.try_take_leased_lock(0, "key", "bob").await.unwrap();
293-
assert!(acquired6);
293+
assert_eq!(acquired6, Some(1)); // new lock generation!
294294

295295
tokio::time::sleep(Duration::from_millis(1)).await;
296296

297297
// The other gets it almost immediately too.
298298
let acquired7 = store.try_take_leased_lock(0, "key", "alice").await.unwrap();
299-
assert!(acquired7);
299+
assert_eq!(acquired7, Some(2)); // new lock generation!
300300

301301
tokio::time::sleep(Duration::from_millis(1)).await;
302302

303-
// But when we take a longer lease...
303+
// But when we take a longer lease
304304
let acquired8 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
305-
assert!(acquired8);
305+
assert_eq!(acquired8, Some(3)); // new lock generation!
306306

307307
// It blocks the other user.
308308
let acquired9 = store.try_take_leased_lock(300, "key", "alice").await.unwrap();
309-
assert!(!acquired9);
309+
assert!(acquired9.is_none()); // not acquired
310310

311311
// We can hold onto our lease.
312312
let acquired10 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
313-
assert!(acquired10);
313+
assert_eq!(acquired10, Some(3)); // same lock generation
314314
}
315315
}
316316
};

crates/matrix-sdk-base/src/event_cache/store/memory_store.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ use std::{collections::HashMap, num::NonZeroUsize, sync::RwLock as StdRwLock, ti
1616

1717
use async_trait::async_trait;
1818
use matrix_sdk_common::{
19-
ring_buffer::RingBuffer, store_locks::memory_store_helper::try_take_leased_lock,
19+
ring_buffer::RingBuffer,
20+
store_locks::{memory_store_helper::try_take_leased_lock, LockGeneration},
2021
};
2122
use ruma::{MxcUri, OwnedMxcUri};
2223

@@ -30,7 +31,7 @@ use crate::media::{MediaRequestParameters, UniqueKey as _};
3031
#[derive(Debug)]
3132
pub struct MemoryStore {
3233
media: StdRwLock<RingBuffer<(OwnedMxcUri, String /* unique key */, Vec<u8>)>>,
33-
leases: StdRwLock<HashMap<String, (String, Instant)>>,
34+
leases: StdRwLock<HashMap<String, (String, Instant, LockGeneration)>>,
3435
}
3536

3637
// SAFETY: `new_unchecked` is safe because 20 is not zero.
@@ -62,7 +63,7 @@ impl EventCacheStore for MemoryStore {
6263
lease_duration_ms: u32,
6364
key: &str,
6465
holder: &str,
65-
) -> Result<bool, Self::Error> {
66+
) -> Result<Option<LockGeneration>, Self::Error> {
6667
Ok(try_take_leased_lock(&self.leases, lease_duration_ms, key, holder))
6768
}
6869

crates/matrix-sdk-base/src/event_cache/store/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
//! into the event cache for the actual storage. By default this brings an
2020
//! in-memory store.
2121
22-
use std::{fmt, ops::Deref, str::Utf8Error, sync::Arc};
22+
use std::{fmt, ops::Deref, result::Result as StdResult, str::Utf8Error, sync::Arc};
2323

2424
#[cfg(any(test, feature = "testing"))]
2525
#[macro_use]
@@ -160,7 +160,7 @@ impl EventCacheStoreError {
160160
}
161161

162162
/// An `EventCacheStore` specific result type.
163-
pub type Result<T, E = EventCacheStoreError> = std::result::Result<T, E>;
163+
pub type Result<T, E = EventCacheStoreError> = StdResult<T, E>;
164164

165165
/// A type that wraps the [`EventCacheStore`] but implements [`BackingStore`] to
166166
/// make it usable inside the cross process lock.
@@ -177,7 +177,7 @@ impl BackingStore for LockableEventCacheStore {
177177
lease_duration_ms: u32,
178178
key: &str,
179179
holder: &str,
180-
) -> std::result::Result<bool, Self::LockError> {
181-
self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
180+
) -> StdResult<bool, Self::LockError> {
181+
Ok(self.0.try_take_leased_lock(lease_duration_ms, key, holder).await?.is_some())
182182
}
183183
}

crates/matrix-sdk-base/src/event_cache/store/traits.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use std::{fmt, sync::Arc};
1616

1717
use async_trait::async_trait;
18-
use matrix_sdk_common::AsyncTraitDeps;
18+
use matrix_sdk_common::{store_locks::LockGeneration, AsyncTraitDeps};
1919
use ruma::MxcUri;
2020

2121
use super::EventCacheStoreError;
@@ -35,7 +35,7 @@ pub trait EventCacheStore: AsyncTraitDeps {
3535
lease_duration_ms: u32,
3636
key: &str,
3737
holder: &str,
38-
) -> Result<bool, Self::Error>;
38+
) -> Result<Option<LockGeneration>, Self::Error>;
3939

4040
/// Add a media file's content in the media store.
4141
///
@@ -127,7 +127,7 @@ impl<T: EventCacheStore> EventCacheStore for EraseEventCacheStoreError<T> {
127127
lease_duration_ms: u32,
128128
key: &str,
129129
holder: &str,
130-
) -> Result<bool, Self::Error> {
130+
) -> Result<Option<LockGeneration>, Self::Error> {
131131
self.0.try_take_leased_lock(lease_duration_ms, key, holder).await.map_err(Into::into)
132132
}
133133

crates/matrix-sdk-common/src/store_locks.rs

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ use crate::{
5454
SendOutsideWasm,
5555
};
5656

57+
/// A lock generation is an integer incremented each time it is taken by another
58+
/// holder. This is not used by all cross-process locks.
59+
pub type LockGeneration = u64;
60+
61+
/// Describe the first lock generation value (see [`LockGeneration`]).
62+
pub const FIRST_LOCK_GENERATION: u64 = 0;
63+
5764
/// Backing store for a cross-process lock.
5865
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
5966
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
@@ -351,16 +358,23 @@ mod tests {
351358

352359
use super::{
353360
memory_store_helper::try_take_leased_lock, BackingStore, CrossProcessStoreLock,
354-
CrossProcessStoreLockGuard, LockStoreError, EXTEND_LEASE_EVERY_MS,
361+
CrossProcessStoreLockGuard, LockGeneration, LockStoreError, EXTEND_LEASE_EVERY_MS,
355362
};
356363

364+
type HolderExpirationGeneration = (String, Instant, LockGeneration);
365+
357366
#[derive(Clone, Default)]
358367
struct TestStore {
359-
leases: Arc<RwLock<HashMap<String, (String, Instant)>>>,
368+
leases: Arc<RwLock<HashMap<String, HolderExpirationGeneration>>>,
360369
}
361370

362371
impl TestStore {
363-
fn try_take_leased_lock(&self, lease_duration_ms: u32, key: &str, holder: &str) -> bool {
372+
fn try_take_leased_lock(
373+
&self,
374+
lease_duration_ms: u32,
375+
key: &str,
376+
holder: &str,
377+
) -> Option<LockGeneration> {
364378
try_take_leased_lock(&self.leases, lease_duration_ms, key, holder)
365379
}
366380
}
@@ -380,7 +394,7 @@ mod tests {
380394
key: &str,
381395
holder: &str,
382396
) -> Result<bool, Self::LockError> {
383-
Ok(self.try_take_leased_lock(lease_duration_ms, key, holder))
397+
Ok(self.try_take_leased_lock(lease_duration_ms, key, holder).is_some())
384398
}
385399
}
386400

@@ -506,36 +520,45 @@ pub mod memory_store_helper {
506520
time::{Duration, Instant},
507521
};
508522

523+
use super::LockGeneration;
524+
525+
type HolderExpirationGeneration = (String, Instant, LockGeneration);
526+
527+
/// Try to acquire or to extend the lock.
528+
///
529+
/// Return `Some` if the lock has been acquired (or extended). It contains
530+
/// the generation number.
509531
pub fn try_take_leased_lock(
510-
leases: &RwLock<HashMap<String, (String, Instant)>>,
532+
leases: &RwLock<HashMap<String, HolderExpirationGeneration>>,
511533
lease_duration_ms: u32,
512534
key: &str,
513535
holder: &str,
514-
) -> bool {
536+
) -> Option<LockGeneration> {
515537
let now = Instant::now();
516538
let expiration = now + Duration::from_millis(lease_duration_ms.into());
517539

518540
match leases.write().unwrap().entry(key.to_owned()) {
519541
// There is an existing holder.
520542
Entry::Occupied(mut entry) => {
521-
let (current_holder, current_expiration) = entry.get_mut();
543+
let (current_holder, current_expiration, current_generation) = entry.get_mut();
522544

523545
if current_holder == holder {
524546
// We had the lease before, extend it.
525547
*current_expiration = expiration;
526548

527-
true
549+
Some(*current_generation)
528550
} else {
529551
// We didn't have it.
530552
if *current_expiration < now {
531553
// Steal it!
532554
*current_holder = holder.to_owned();
533555
*current_expiration = expiration;
556+
*current_generation += 1;
534557

535-
true
558+
Some(*current_generation)
536559
} else {
537560
// We tried our best.
538-
false
561+
None
539562
}
540563
}
541564
}
@@ -545,9 +568,10 @@ pub mod memory_store_helper {
545568
entry.insert((
546569
holder.to_owned(),
547570
Instant::now() + Duration::from_millis(lease_duration_ms.into()),
571+
0,
548572
));
549573

550-
true
574+
Some(0)
551575
}
552576
}
553577
}

crates/matrix-sdk-crypto/src/store/memorystore.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::{
2020
};
2121

2222
use async_trait::async_trait;
23-
use matrix_sdk_common::store_locks::memory_store_helper::try_take_leased_lock;
23+
use matrix_sdk_common::store_locks::{memory_store_helper::try_take_leased_lock, LockGeneration};
2424
use ruma::{
2525
events::secret::request::SecretName, DeviceId, OwnedDeviceId, OwnedRoomId, OwnedTransactionId,
2626
OwnedUserId, RoomId, TransactionId, UserId,
@@ -90,7 +90,7 @@ pub struct MemoryStore {
9090
key_requests_by_info: StdRwLock<HashMap<String, OwnedTransactionId>>,
9191
direct_withheld_info: StdRwLock<HashMap<OwnedRoomId, HashMap<String, RoomKeyWithheldEvent>>>,
9292
custom_values: StdRwLock<HashMap<String, Vec<u8>>>,
93-
leases: StdRwLock<HashMap<String, (String, Instant)>>,
93+
leases: StdRwLock<HashMap<String, (String, Instant, LockGeneration)>>,
9494
secret_inbox: StdRwLock<HashMap<String, Vec<GossippedSecret>>>,
9595
backup_keys: RwLock<BackupKeys>,
9696
next_batch_token: RwLock<Option<String>>,
@@ -632,7 +632,7 @@ impl CryptoStore for MemoryStore {
632632
key: &str,
633633
holder: &str,
634634
) -> Result<bool> {
635-
Ok(try_take_leased_lock(&self.leases, lease_duration_ms, key, holder))
635+
Ok(try_take_leased_lock(&self.leases, lease_duration_ms, key, holder).is_some())
636636
}
637637
}
638638

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TABLE "lease_locks" ADD COLUMN "generation" INTEGER NOT NULL DEFAULT 0;

crates/matrix-sdk-sqlite/src/event_cache_store.rs

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
55
use matrix_sdk_base::{
66
event_cache::store::EventCacheStore,
77
media::{MediaRequestParameters, UniqueKey},
8+
store_locks::LockGeneration,
89
};
910
use matrix_sdk_store_encryption::StoreCipher;
1011
use ruma::MilliSecondsSinceUnixEpoch;
@@ -28,7 +29,7 @@ mod keys {
2829
/// This is used to figure whether the SQLite database requires a migration.
2930
/// Every new SQL migration should imply a bump of this number, and changes in
3031
/// the [`run_migrations`] function.
31-
const DATABASE_VERSION: u8 = 2;
32+
const DATABASE_VERSION: u8 = 3;
3233

3334
/// A SQLite-based event cache store.
3435
#[derive(Clone)]
@@ -142,6 +143,16 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
142143
.await?;
143144
}
144145

146+
if version < 3 {
147+
conn.with_transaction(|txn| {
148+
txn.execute_batch(include_str!(
149+
"../migrations/event_cache_store/003_lease_locks_with_generation.sql"
150+
))?;
151+
txn.set_db_version(3)
152+
})
153+
.await?;
154+
}
155+
145156
Ok(())
146157
}
147158

@@ -154,32 +165,41 @@ impl EventCacheStore for SqliteEventCacheStore {
154165
lease_duration_ms: u32,
155166
key: &str,
156167
holder: &str,
157-
) -> Result<bool> {
168+
) -> Result<Option<LockGeneration>> {
158169
let key = key.to_owned();
159170
let holder = holder.to_owned();
160171

161172
let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
162173
let expiration = now + lease_duration_ms as u64;
163174

164-
let num_touched = self
175+
let generation = self
165176
.acquire()
166177
.await?
167178
.with_transaction(move |txn| {
168-
txn.execute(
179+
txn.query_row(
169180
"INSERT INTO lease_locks (key, holder, expiration)
170181
VALUES (?1, ?2, ?3)
171-
ON CONFLICT (key)
172-
DO
173-
UPDATE SET holder = ?2, expiration = ?3
174-
WHERE holder = ?2
182+
ON CONFLICT (key) DO
183+
UPDATE SET
184+
holder = excluded.holder,
185+
expiration = excluded.expiration,
186+
generation =
187+
CASE holder
188+
WHEN excluded.holder THEN generation
189+
ELSE generation + 1
190+
END
191+
WHERE
192+
holder = excluded.holder
175193
OR expiration < ?4
176-
",
194+
RETURNING generation",
177195
(key, holder, expiration, now),
196+
|row| -> rusqlite::Result<u64> { row.get(0) },
178197
)
198+
.optional()
179199
})
180200
.await?;
181201

182-
Ok(num_touched == 1)
202+
Ok(generation)
183203
}
184204

185205
async fn add_media_content(

crates/matrix-sdk/src/event_cache/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ impl EventCache {
225225
mut room_updates_feed: Receiver<RoomUpdates>,
226226
) {
227227
trace!("Spawning the listen task");
228+
228229
loop {
229230
match room_updates_feed.recv().await {
230231
Ok(updates) => {

0 commit comments

Comments
 (0)