Skip to content

Commit faa0e6e

Browse files
committed
feat(event cache): allow using the bloom filter OR the store to deduplicate events
1 parent b95cf79 commit faa0e6e

File tree

3 files changed

+120
-39
lines changed

3 files changed

+120
-39
lines changed

crates/matrix-sdk/src/event_cache/deduplicator.rs

Lines changed: 104 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,109 @@
1818
use std::{collections::BTreeSet, fmt, sync::Mutex};
1919

2020
use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
21-
use ruma::OwnedEventId;
21+
use matrix_sdk_base::event_cache::store::EventCacheStoreLock;
22+
use ruma::{OwnedEventId, OwnedRoomId};
2223
use tracing::{debug, warn};
2324

24-
use super::room::events::{Event, RoomEvents};
25+
use super::{
26+
room::events::{Event, RoomEvents},
27+
EventCacheError,
28+
};
2529

26-
/// `Deduplicator` is an efficient type to find duplicated events.
30+
pub enum Deduplicator {
31+
InMemory(BloomFilterDeduplicator),
32+
PersistentStore(StoreDeduplicator),
33+
}
34+
35+
impl Deduplicator {
36+
/// Create an empty deduplicator instance that uses an internal Bloom
37+
/// filter.
38+
///
39+
/// Such a deduplicator is stateful, with no initial known events, and it
40+
/// will learn over time by using a Bloom filter which events are
41+
/// duplicates or not.
42+
///
43+
/// When the persistent storage is enabled by default, this constructor
44+
/// (and the associated variant) will be removed.
45+
pub fn new_memory_based() -> Self {
46+
Self::InMemory(BloomFilterDeduplicator::new())
47+
}
48+
49+
/// Create new store-based deduplicator that will run queries against the
50+
/// store to find if any event is deduplicated or not.
51+
///
52+
/// This deduplicator is stateless.
53+
///
54+
/// When the persistent storage is enabled by default, this will become the
55+
/// default, and [`Deduplicator`] will be replaced with
56+
/// [`StoreDeduplicator`].
57+
pub fn new_store_based(room_id: OwnedRoomId, store: EventCacheStoreLock) -> Self {
58+
Self::PersistentStore(StoreDeduplicator { room_id, store })
59+
}
60+
61+
/// Find duplicates in the given collection of events, and return both
62+
/// valid events (those with an event id) as well as the event ids of
63+
/// duplicate events.
64+
pub async fn filter_duplicate_events<I>(
65+
&self,
66+
events: I,
67+
room_events: &RoomEvents,
68+
) -> Result<(Vec<Event>, Vec<OwnedEventId>), EventCacheError>
69+
where
70+
I: Iterator<Item = Event>,
71+
{
72+
match self {
73+
Deduplicator::InMemory(dedup) => Ok(dedup.filter_duplicate_events(events, room_events)),
74+
Deduplicator::PersistentStore(dedup) => dedup.filter_duplicate_events(events).await,
75+
}
76+
}
77+
}
78+
79+
/// A deduplication mechanism based on the persistent storage associated to the
80+
/// event cache.
81+
///
82+
/// It will use queries to the persistent storage to figure where events are
83+
/// duplicates or not, making it entirely stateless.
84+
pub struct StoreDeduplicator {
85+
/// The room this deduplicator applies to.
86+
room_id: OwnedRoomId,
87+
/// The actual event cache store implementation used to query events.
88+
store: EventCacheStoreLock,
89+
}
90+
91+
impl StoreDeduplicator {
92+
async fn filter_duplicate_events<I>(
93+
&self,
94+
events: I,
95+
) -> Result<(Vec<Event>, Vec<OwnedEventId>), EventCacheError>
96+
where
97+
I: Iterator<Item = Event>,
98+
{
99+
let store = self.store.lock().await?;
100+
101+
// Collect event ids as we "validate" events (i.e. check they have a valid event
102+
// id.)
103+
let mut event_ids = Vec::new();
104+
let events = events
105+
.filter_map(|event| {
106+
if let Some(event_id) = event.event_id() {
107+
event_ids.push(event_id);
108+
Some(event)
109+
} else {
110+
None
111+
}
112+
})
113+
.collect::<Vec<_>>();
114+
115+
// Let the store do its magic ✨
116+
let duplicates = store.filter_duplicated_events(&self.room_id, event_ids).await?;
117+
118+
Ok((events, duplicates))
119+
}
120+
}
121+
122+
/// `BloomFilterDeduplicator` is an efficient type to find duplicated events,
123+
/// using an in-memory cache.
27124
///
28125
/// It uses a [bloom filter] to provide a memory efficient probabilistic answer
29126
/// to: “has event E been seen already?”. False positives are possible, while
@@ -49,34 +146,18 @@ impl BloomFilterDeduplicator {
49146
const DESIRED_FALSE_POSITIVE_RATE: f64 = 0.01;
50147

51148
/// Create a new `Deduplicator` with no prior knowledge of known events.
52-
#[cfg(test)]
53-
pub fn new() -> Self {
54-
Self::with_initial_events(std::iter::empty())
55-
}
56-
57-
/// Create a new `Deduplicator` filled with initial events.
58-
///
59-
/// This won't detect duplicates in the initial events, only learn about
60-
/// those events.
61-
pub fn with_initial_events<'a>(events: impl Iterator<Item = &'a Event>) -> Self {
62-
let mut bloom_filter = GrowableBloomBuilder::new()
149+
fn new() -> Self {
150+
let bloom_filter = GrowableBloomBuilder::new()
63151
.estimated_insertions(Self::APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS)
64152
.desired_error_ratio(Self::DESIRED_FALSE_POSITIVE_RATE)
65153
.build();
66-
for e in events {
67-
let Some(event_id) = e.event_id() else {
68-
warn!("initial event in deduplicator had no event id");
69-
continue;
70-
};
71-
bloom_filter.insert(event_id);
72-
}
73154
Self { bloom_filter: Mutex::new(bloom_filter) }
74155
}
75156

76157
/// Find duplicates in the given collection of events, and return both
77158
/// valid events (those with an event id) as well as the event ids of
78159
/// duplicate events.
79-
pub fn filter_duplicate_events<'a, I>(
160+
fn filter_duplicate_events<'a, I>(
80161
&'a self,
81162
events: I,
82163
room_events: &'a RoomEvents,
@@ -184,7 +265,7 @@ impl BloomFilterDeduplicator {
184265

185266
/// Information about the scanned collection of events.
186267
#[derive(Debug)]
187-
pub enum Decoration<I> {
268+
enum Decoration<I> {
188269
/// This event is not duplicated.
189270
Unique(I),
190271

crates/matrix-sdk/src/event_cache/pagination.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ impl RoomPagination {
181181
.collect::<Vec<_>>();
182182

183183
let (new_events, duplicated_event_ids, all_deduplicated) =
184-
state.collect_valid_and_duplicated_events(sync_events.clone().into_iter());
184+
state.collect_valid_and_duplicated_events(sync_events.clone().into_iter()).await?;
185185

186186
let (backpagination_outcome, sync_timeline_events_diffs) = state
187187
.with_events_mut(move |room_events| {

crates/matrix-sdk/src/event_cache/room/mod.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -424,8 +424,9 @@ impl RoomEventCacheInner {
424424
return Ok(());
425425
}
426426

427-
let (events, duplicated_event_ids, all_duplicates) =
428-
state.collect_valid_and_duplicated_events(sync_timeline_events.clone().into_iter());
427+
let (events, duplicated_event_ids, all_duplicates) = state
428+
.collect_valid_and_duplicated_events(sync_timeline_events.clone().into_iter())
429+
.await?;
429430

430431
let sync_timeline_events_diffs = if all_duplicates {
431432
// No new events, thus no need to change the room events.
@@ -543,7 +544,7 @@ mod private {
543544
use tracing::{error, instrument, trace};
544545

545546
use super::{chunk_debug_string, events::RoomEvents};
546-
use crate::event_cache::{deduplicator::BloomFilterDeduplicator, EventCacheError};
547+
use crate::event_cache::{deduplicator::Deduplicator, EventCacheError};
547548

548549
/// State for a single room's event cache.
549550
///
@@ -563,7 +564,7 @@ mod private {
563564
events: RoomEvents,
564565

565566
/// The events deduplicator instance to help finding duplicates.
566-
deduplicator: BloomFilterDeduplicator,
567+
deduplicator: Deduplicator,
567568

568569
/// Have we ever waited for a previous-batch-token to come from sync, in
569570
/// the context of pagination? We do this at most once per room,
@@ -602,7 +603,7 @@ mod private {
602603
room: OwnedRoomId,
603604
store: Arc<OnceCell<EventCacheStoreLock>>,
604605
) -> Result<Self, EventCacheError> {
605-
let events = if let Some(store) = store.get() {
606+
let (events, deduplicator) = if let Some(store) = store.get() {
606607
let locked = store.lock().await?;
607608

608609
// Try to reload a linked chunk from storage. If it fails, log the error and
@@ -620,15 +621,14 @@ mod private {
620621
}
621622
};
622623

623-
RoomEvents::with_initial_chunks(linked_chunk)
624+
(
625+
RoomEvents::with_initial_chunks(linked_chunk),
626+
Deduplicator::new_store_based(room.clone(), store.clone()),
627+
)
624628
} else {
625-
RoomEvents::default()
629+
(RoomEvents::default(), Deduplicator::new_memory_based())
626630
};
627631

628-
let deduplicator = BloomFilterDeduplicator::with_initial_events(
629-
events.events().map(|(_pos, event)| event),
630-
);
631-
632632
Ok(Self { room, store, events, deduplicator, waited_for_initial_prev_token: false })
633633
}
634634

@@ -658,19 +658,19 @@ mod private {
658658
/// possibly misplace them. And we should not be missing
659659
/// events either: the already-known events would have their own
660660
/// previous-batch token (it might already be consumed).
661-
pub fn collect_valid_and_duplicated_events<'a, I>(
661+
pub async fn collect_valid_and_duplicated_events<'a, I>(
662662
&'a mut self,
663663
events: I,
664-
) -> (Vec<Event>, Vec<OwnedEventId>, bool)
664+
) -> Result<(Vec<Event>, Vec<OwnedEventId>, bool), EventCacheError>
665665
where
666666
I: Iterator<Item = Event> + 'a,
667667
{
668668
let (events, duplicated_event_ids) =
669-
self.deduplicator.filter_duplicate_events(events, &self.events);
669+
self.deduplicator.filter_duplicate_events(events, &self.events).await?;
670670

671671
let all_duplicates = !events.is_empty() && events.len() == duplicated_event_ids.len();
672672

673-
(events, duplicated_event_ids, all_duplicates)
673+
Ok((events, duplicated_event_ids, all_duplicates))
674674
}
675675

676676
/// Removes the bundled relations from an event, if they were present.

0 commit comments

Comments
 (0)