-
Notifications
You must be signed in to change notification settings - Fork 315
feat: Implement EventCacheStoreLock::lock()
with poison error, and ::lock_unchecked
#4285
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
b06dac3
dc31f93
d4e69a3
eff9827
fa18ed4
4ace281
5073cd1
cd4c12e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,6 +54,13 @@ use crate::{ | |
SendOutsideWasm, | ||
}; | ||
|
||
/// A lock generation is an integer incremented each time it is taken by another | ||
/// holder. This is not used by all cross-process locks. | ||
pub type LockGeneration = u64; | ||
|
||
/// Describe the first lock generation value (see [`LockGeneration`]). | ||
pub const FIRST_LOCK_GENERATION: LockGeneration = 0; | ||
|
||
/// Backing store for a cross-process lock. | ||
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] | ||
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] | ||
|
@@ -351,16 +358,23 @@ mod tests { | |
|
||
use super::{ | ||
memory_store_helper::try_take_leased_lock, BackingStore, CrossProcessStoreLock, | ||
CrossProcessStoreLockGuard, LockStoreError, EXTEND_LEASE_EVERY_MS, | ||
CrossProcessStoreLockGuard, LockGeneration, LockStoreError, EXTEND_LEASE_EVERY_MS, | ||
}; | ||
|
||
type HolderExpirationGeneration = (String, Instant, LockGeneration); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Care to either comment, or use a small struct with named fields, please? |
||
|
||
#[derive(Clone, Default)] | ||
struct TestStore { | ||
leases: Arc<RwLock<HashMap<String, (String, Instant)>>>, | ||
leases: Arc<RwLock<HashMap<String, HolderExpirationGeneration>>>, | ||
} | ||
|
||
impl TestStore { | ||
fn try_take_leased_lock(&self, lease_duration_ms: u32, key: &str, holder: &str) -> bool { | ||
fn try_take_leased_lock( | ||
&self, | ||
lease_duration_ms: u32, | ||
key: &str, | ||
holder: &str, | ||
) -> Option<LockGeneration> { | ||
try_take_leased_lock(&self.leases, lease_duration_ms, key, holder) | ||
} | ||
} | ||
|
@@ -380,7 +394,7 @@ mod tests { | |
key: &str, | ||
holder: &str, | ||
) -> Result<bool, Self::LockError> { | ||
Ok(self.try_take_leased_lock(lease_duration_ms, key, holder)) | ||
Ok(self.try_take_leased_lock(lease_duration_ms, key, holder).is_some()) | ||
} | ||
} | ||
|
||
|
@@ -506,36 +520,45 @@ pub mod memory_store_helper { | |
time::{Duration, Instant}, | ||
}; | ||
|
||
use super::{LockGeneration, FIRST_LOCK_GENERATION}; | ||
|
||
type HolderExpirationGeneration = (String, Instant, LockGeneration); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you deduplicate this? |
||
|
||
/// Try to acquire or to extend the lock. | ||
/// | ||
/// Return `Some` if the lock has been acquired (or extended). It contains | ||
/// the generation number. | ||
pub fn try_take_leased_lock( | ||
leases: &RwLock<HashMap<String, (String, Instant)>>, | ||
leases: &RwLock<HashMap<String, HolderExpirationGeneration>>, | ||
lease_duration_ms: u32, | ||
key: &str, | ||
holder: &str, | ||
) -> bool { | ||
) -> Option<LockGeneration> { | ||
let now = Instant::now(); | ||
let expiration = now + Duration::from_millis(lease_duration_ms.into()); | ||
|
||
match leases.write().unwrap().entry(key.to_owned()) { | ||
// There is an existing holder. | ||
Entry::Occupied(mut entry) => { | ||
let (current_holder, current_expiration) = entry.get_mut(); | ||
let (current_holder, current_expiration, current_generation) = entry.get_mut(); | ||
|
||
if current_holder == holder { | ||
// We had the lease before, extend it. | ||
*current_expiration = expiration; | ||
|
||
true | ||
Some(*current_generation) | ||
} else { | ||
// We didn't have it. | ||
if *current_expiration < now { | ||
// Steal it! | ||
*current_holder = holder.to_owned(); | ||
*current_expiration = expiration; | ||
*current_generation += 1; | ||
|
||
true | ||
Some(*current_generation) | ||
} else { | ||
// We tried our best. | ||
false | ||
None | ||
} | ||
} | ||
} | ||
|
@@ -545,9 +568,10 @@ pub mod memory_store_helper { | |
entry.insert(( | ||
holder.to_owned(), | ||
Instant::now() + Duration::from_millis(lease_duration_ms.into()), | ||
0, | ||
)); | ||
|
||
true | ||
Some(FIRST_LOCK_GENERATION) | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
ALTER TABLE "lease_locks" ADD COLUMN "generation" INTEGER NOT NULL DEFAULT 0; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime}; | |
use matrix_sdk_base::{ | ||
event_cache::store::EventCacheStore, | ||
media::{MediaRequestParameters, UniqueKey}, | ||
store_locks::LockGeneration, | ||
}; | ||
use matrix_sdk_store_encryption::StoreCipher; | ||
use ruma::MilliSecondsSinceUnixEpoch; | ||
|
@@ -28,7 +29,7 @@ mod keys { | |
/// This is used to figure whether the SQLite database requires a migration. | ||
/// Every new SQL migration should imply a bump of this number, and changes in | ||
/// the [`run_migrations`] function. | ||
const DATABASE_VERSION: u8 = 2; | ||
const DATABASE_VERSION: u8 = 3; | ||
|
||
/// A SQLite-based event cache store. | ||
#[derive(Clone)] | ||
|
@@ -142,6 +143,16 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> { | |
.await?; | ||
} | ||
|
||
if version < 3 { | ||
conn.with_transaction(|txn| { | ||
txn.execute_batch(include_str!( | ||
"../migrations/event_cache_store/003_lease_locks_with_generation.sql" | ||
))?; | ||
txn.set_db_version(3) | ||
}) | ||
.await?; | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
|
@@ -154,32 +165,41 @@ impl EventCacheStore for SqliteEventCacheStore { | |
lease_duration_ms: u32, | ||
key: &str, | ||
holder: &str, | ||
) -> Result<bool> { | ||
) -> Result<Option<LockGeneration>> { | ||
let key = key.to_owned(); | ||
let holder = holder.to_owned(); | ||
|
||
let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into(); | ||
let expiration = now + lease_duration_ms as u64; | ||
|
||
let num_touched = self | ||
let generation = self | ||
.acquire() | ||
.await? | ||
.with_transaction(move |txn| { | ||
txn.execute( | ||
txn.query_row( | ||
"INSERT INTO lease_locks (key, holder, expiration) | ||
VALUES (?1, ?2, ?3) | ||
ON CONFLICT (key) | ||
DO | ||
UPDATE SET holder = ?2, expiration = ?3 | ||
WHERE holder = ?2 | ||
ON CONFLICT (key) DO | ||
UPDATE SET | ||
holder = excluded.holder, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add in a comment where does this |
||
expiration = excluded.expiration, | ||
generation = | ||
CASE holder | ||
WHEN excluded.holder THEN generation | ||
ELSE generation + 1 | ||
END | ||
WHERE | ||
holder = excluded.holder | ||
OR expiration < ?4 | ||
", | ||
RETURNING generation", | ||
(key, holder, expiration, now), | ||
|row| row.get(0), | ||
) | ||
.optional() | ||
}) | ||
.await?; | ||
|
||
Ok(num_touched == 1) | ||
Ok(generation) | ||
} | ||
|
||
async fn add_media_content( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the meaning of the last sentence here? Consider removing it now, since we're likely to forget about this comment and let it rot even in a future where the crypto store lock would use it?