Skip to content

Commit c719cd1

Browse files
committed
fix(event cache): properly clear all rooms, including those sleeping in the store backend
1 parent 42133a6 commit c719cd1

File tree

2 files changed

+153
-10
lines changed

2 files changed

+153
-10
lines changed

crates/matrix-sdk/src/event_cache/mod.rs

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use std::{
3535

3636
use eyeball::{SharedObservable, Subscriber};
3737
use eyeball_im::VectorDiff;
38+
use futures_util::future::{join_all, try_join_all};
3839
use matrix_sdk_base::{
3940
deserialized_responses::{AmbiguityChange, TimelineEvent},
4041
event_cache::store::{EventCacheStoreError, EventCacheStoreLock},
@@ -459,18 +460,73 @@ impl EventCacheInner {
459460

460461
/// Clears all the room's data.
461462
async fn clear_all_rooms(&self) -> Result<()> {
462-
// Note: one must NOT clear the `by_room` map, because if something subscribed
463-
// to a room update, they would never get any new update for that room, since
464-
// re-creating the `RoomEventCache` would create a new unrelated sender.
465-
466-
// Note 2: we don't need to clear the [`Self::events`] map, because events are
467-
// immutable in the Matrix protocol.
463+
// Okay, here's where things get complicated.
464+
//
465+
// On the one hand, `by_room` may include storage for *some* rooms that we know
466+
// about, but not *all* of them. Any room that hasn't been loaded in the
467+
// client, or touched by a sync, will remain unloaded in memory, so it
468+
// will be missing from `self.by_room`. As a result, we need to make
469+
// sure that we're hitting the storage backend to *really* clear all the
470+
// rooms, including those that haven't been loaded yet.
471+
//
472+
// On the other hand, one must NOT clear the `by_room` map, because if someone
473+
// subscribed to a room update, they would never get any new update for
474+
// that room, since re-creating the `RoomEventCache` would create a new,
475+
// unrelated sender.
476+
//
477+
// So we need to *keep* the rooms in `by_room` alive, while clearing them in the
478+
// store backend.
479+
//
480+
// As a result, for a short while, the in-memory linked chunks
481+
// will be desynchronized from the storage. We need to be careful then. During
482+
// that short while, we don't want *anyone* to touch the linked chunk
483+
// (be it in memory or in the storage).
484+
//
485+
// And since that requirement applies to *any* room in `by_room` at the same
486+
// time, we'll have to take the locks for *all* the live rooms, so as to
487+
// properly clear the underlying storage.
488+
//
489+
// At this point, you might be scared about the potential for deadlocking. I am
490+
// as well, but I'm convinced we're fine:
491+
// 1. the lock for `by_room` is usually held only for a short while, and
492+
// independently of the other two kinds.
493+
// 2. the state may acquire the store cross-process lock internally, but only
494+
// while the state's methods are called (so it's always transient). As a
495+
// result, as soon as we've acquired the state locks, the store lock ought to
496+
// be free.
497+
// 3. The store lock is held explicitly only in a small scoped area below.
498+
// 4. Then the store lock will be held internally when calling `reset()`, but at
499+
// this point it's only held for a short while each time, so rooms will take
500+
// turn to acquire it.
468501

469502
let rooms = self.by_room.write().await;
470-
for room in rooms.values() {
471-
room.clear().await?;
503+
504+
// Collect all the rooms' state locks, first: we can clear the storage only when
505+
// nobody will touch it at the same time.
506+
let room_locks = join_all(
507+
rooms.values().map(|room| async move { (room, room.inner.state.write().await) }),
508+
)
509+
.await;
510+
511+
// Clear the storage for all the rooms, using the storage facility.
512+
if let Some(store) = self.store.get() {
513+
let store_guard = store.lock().await?;
514+
store_guard.clear_all_rooms_chunks().await?;
472515
}
473516

517+
// At this point, all the in-memory linked chunks are desynchronized from the
518+
// storage. Resynchronize them manually by calling reset(), and
519+
// propagate updates to observers.
520+
try_join_all(room_locks.into_iter().map(|(room, mut state_guard)| async move {
521+
let updates_as_vector_diffs = state_guard.reset().await?;
522+
let _ = room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
523+
diffs: updates_as_vector_diffs,
524+
origin: EventsOrigin::Cache,
525+
});
526+
Ok::<_, EventCacheError>(())
527+
}))
528+
.await?;
529+
474530
Ok(())
475531
}
476532

crates/matrix-sdk/tests/integration/event_cache.rs

Lines changed: 89 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,30 @@
1-
use std::{ops::Not, time::Duration};
1+
use std::{ops::Not, sync::Arc, time::Duration};
22

33
use assert_matches::assert_matches;
44
use assert_matches2::assert_let;
55
use eyeball_im::VectorDiff;
66
use futures_util::FutureExt;
7+
use imbl::Vector;
78
use matrix_sdk::{
89
assert_let_timeout, assert_next_matches_with_timeout,
910
deserialized_responses::TimelineEvent,
1011
event_cache::{
1112
BackPaginationOutcome, EventCacheError, RoomEventCacheUpdate, RoomPaginationStatus,
1213
},
1314
linked_chunk::{ChunkIdentifier, Position, Update},
15+
store::StoreConfig,
1416
test_utils::{
1517
assert_event_matches_msg,
1618
mocks::{MatrixMockServer, RoomMessagesResponseTemplate},
1719
},
1820
};
19-
use matrix_sdk_base::event_cache::Gap;
21+
use matrix_sdk_base::event_cache::{
22+
store::{EventCacheStore, MemoryStore},
23+
Gap,
24+
};
2025
use matrix_sdk_test::{
2126
async_test, event_factory::EventFactory, GlobalAccountDataTestEvent, JoinedRoomBuilder, ALICE,
27+
BOB,
2228
};
2329
use ruma::{
2430
event_id,
@@ -2540,3 +2546,84 @@ async fn test_dont_remove_only_gap() {
25402546
let outcome = room_event_cache.pagination().run_backwards_once(16).await.unwrap();
25412547
assert!(outcome.reached_start);
25422548
}
2549+
2550+
#[async_test]
2551+
async fn test_clear_all_rooms() {
2552+
let sleeping_room_id = room_id!("!dodo:saucisse.bzh");
2553+
let event_cache_store = Arc::new(MemoryStore::new());
2554+
2555+
let f = EventFactory::new().room(sleeping_room_id);
2556+
let ev0 = f.text_msg("hi").sender(*ALICE).event_id(event_id!("$ev0")).into_event();
2557+
2558+
// Feed the cache with one room with one event, before the client is created.
2559+
// This room will remain sleeping.
2560+
{
2561+
let cid = ChunkIdentifier::new(0);
2562+
event_cache_store
2563+
.handle_linked_chunk_updates(
2564+
sleeping_room_id,
2565+
vec![
2566+
Update::NewItemsChunk { previous: None, new: cid, next: None },
2567+
Update::PushItems { at: Position::new(cid, 0), items: vec![ev0] },
2568+
],
2569+
)
2570+
.await
2571+
.unwrap();
2572+
}
2573+
2574+
let server = MatrixMockServer::new().await;
2575+
let client = server
2576+
.client_builder()
2577+
.store_config(
2578+
StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
2579+
)
2580+
.build()
2581+
.await;
2582+
2583+
client.event_cache().subscribe().unwrap();
2584+
client.event_cache().enable_storage().unwrap();
2585+
2586+
// Another room gets a live event: it's loaded in the event cache now, while
2587+
// sleeping_room_id is not.
2588+
let room_id = room_id!("!galette:saucisse.bzh");
2589+
let room = server
2590+
.sync_room(
2591+
&client,
2592+
JoinedRoomBuilder::new(room_id).add_timeline_event(
2593+
f.text_msg("bonchourhan").sender(*BOB).event_id(event_id!("$ev1")),
2594+
),
2595+
)
2596+
.await;
2597+
2598+
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2599+
let (initial, mut room_updates) = room_event_cache.subscribe().await;
2600+
2601+
let mut initial = Vector::from(initial);
2602+
// Wait for the ev1 event.
2603+
if initial.is_empty() {
2604+
assert_let_timeout!(
2605+
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_updates.recv()
2606+
);
2607+
assert_eq!(diffs.len(), 1);
2608+
assert_matches!(diffs[0], VectorDiff::Append { .. });
2609+
diffs[0].clone().apply(&mut initial);
2610+
}
2611+
// The room state now contains one event.
2612+
assert_eq!(initial.len(), 1);
2613+
assert_event_id!(initial[0], "$ev1");
2614+
2615+
// Now, clear all the rooms.
2616+
client.event_cache().clear_all_rooms().await.unwrap();
2617+
2618+
// We should get an update for the live room.
2619+
assert_let_timeout!(
2620+
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_updates.recv()
2621+
);
2622+
assert_eq!(diffs.len(), 1);
2623+
assert_let!(VectorDiff::Clear = &diffs[0]);
2624+
2625+
// The sleeping room should have been cleared too.
2626+
let (maybe_last_chunk, _chunk_id_gen) =
2627+
event_cache_store.load_last_chunk(sleeping_room_id).await.unwrap();
2628+
assert!(maybe_last_chunk.is_none());
2629+
}

0 commit comments

Comments
 (0)