Skip to content

Commit 2e49719

Browse files
zecakehjplatte
authored andcommitted
fix(stores): Separate receipts by thread
The key encoding in key-value stores is backwards-compatible so old receipts are counted as unthreaded receipts. Signed-off-by: Kévin Commaille <zecakeh@tedomum.fr>
1 parent 9707d73 commit 2e49719

File tree

8 files changed

+320
-63
lines changed

8 files changed

+320
-63
lines changed

crates/matrix-sdk-base/src/rooms/normal.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use futures_util::stream::{self, StreamExt};
2121
use ruma::{
2222
api::client::sync::sync_events::v3::RoomSummary as RumaSummary,
2323
events::{
24-
receipt::{Receipt, ReceiptType},
24+
receipt::{Receipt, ReceiptThread, ReceiptType},
2525
room::{
2626
create::RoomCreateEventContent, encryption::RoomEncryptionEventContent,
2727
guest_access::GuestAccess, history_visibility::HistoryVisibility, join_rules::JoinRule,
@@ -483,21 +483,27 @@ impl Room {
483483
}
484484

485485
/// Get the read receipt as a `EventId` and `Receipt` tuple for the given
486-
/// `user_id` in this room.
486+
/// `thread` and `user_id` in this room.
487487
pub async fn user_read_receipt(
488488
&self,
489+
thread: ReceiptThread,
489490
user_id: &UserId,
490491
) -> StoreResult<Option<(OwnedEventId, Receipt)>> {
491-
self.store.get_user_room_receipt_event(self.room_id(), ReceiptType::Read, user_id).await
492+
self.store
493+
.get_user_room_receipt_event(self.room_id(), ReceiptType::Read, thread, user_id)
494+
.await
492495
}
493496

494497
/// Get the read receipts as a list of `UserId` and `Receipt` tuples for the
495-
/// given `event_id` in this room.
498+
/// given `thread` and `event_id` in this room.
496499
pub async fn event_read_receipts(
497500
&self,
501+
thread: ReceiptThread,
498502
event_id: &EventId,
499503
) -> StoreResult<Vec<(OwnedUserId, Receipt)>> {
500-
self.store.get_event_room_receipt_events(self.room_id(), ReceiptType::Read, event_id).await
504+
self.store
505+
.get_event_room_receipt_events(self.room_id(), ReceiptType::Read, thread, event_id)
506+
.await
501507
}
502508
}
503509

crates/matrix-sdk-base/src/store/integration_tests.rs

Lines changed: 69 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ macro_rules! statestore_integration_tests {
134134
event_id,
135135
events::{
136136
presence::PresenceEvent,
137-
receipt::ReceiptType,
137+
receipt::{ReceiptType, ReceiptThread},
138138
room::{
139139
member::{
140140
MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent,
@@ -475,14 +475,20 @@ macro_rules! statestore_integration_tests {
475475
.await?
476476
.is_some());
477477
assert!(store
478-
.get_user_room_receipt_event(room_id, ReceiptType::Read, user_id)
478+
.get_user_room_receipt_event(
479+
room_id,
480+
ReceiptType::Read,
481+
ReceiptThread::Unthreaded,
482+
user_id
483+
)
479484
.await?
480485
.is_some());
481486
assert_eq!(
482487
store
483488
.get_event_room_receipt_events(
484489
room_id,
485490
ReceiptType::Read,
491+
ReceiptThread::Unthreaded,
486492
first_receipt_event_id()
487493
)
488494
.await?
@@ -613,17 +619,32 @@ macro_rules! statestore_integration_tests {
613619
.expect("json creation failed");
614620

615621
assert!(store
616-
.get_user_room_receipt_event(room_id, ReceiptType::Read, user_id())
622+
.get_user_room_receipt_event(
623+
room_id,
624+
ReceiptType::Read,
625+
ReceiptThread::Unthreaded,
626+
user_id()
627+
)
617628
.await
618629
.expect("failed to read user room receipt")
619630
.is_none());
620631
assert!(store
621-
.get_event_room_receipt_events(room_id, ReceiptType::Read, &first_event_id)
632+
.get_event_room_receipt_events(
633+
room_id,
634+
ReceiptType::Read,
635+
ReceiptThread::Unthreaded,
636+
&first_event_id
637+
)
622638
.await
623639
.expect("failed to read user room receipt for 1")
624640
.is_empty());
625641
assert!(store
626-
.get_event_room_receipt_events(room_id, ReceiptType::Read, &second_event_id)
642+
.get_event_room_receipt_events(
643+
room_id,
644+
ReceiptType::Read,
645+
ReceiptThread::Unthreaded,
646+
&second_event_id
647+
)
627648
.await
628649
.expect("failed to read user room receipt for 2")
629650
.is_empty());
@@ -633,21 +654,36 @@ macro_rules! statestore_integration_tests {
633654

634655
store.save_changes(&changes).await.expect("writing changes fauked");
635656
assert!(store
636-
.get_user_room_receipt_event(room_id, ReceiptType::Read, user_id())
657+
.get_user_room_receipt_event(
658+
room_id,
659+
ReceiptType::Read,
660+
ReceiptThread::Unthreaded,
661+
user_id()
662+
)
637663
.await
638664
.expect("failed to read user room receipt after save")
639665
.is_some());
640666
assert_eq!(
641667
store
642-
.get_event_room_receipt_events(room_id, ReceiptType::Read, &first_event_id)
668+
.get_event_room_receipt_events(
669+
room_id,
670+
ReceiptType::Read,
671+
ReceiptThread::Unthreaded,
672+
&first_event_id
673+
)
643674
.await
644675
.expect("failed to read user room receipt for 1 after save")
645676
.len(),
646677
1,
647678
"Found a wrong number of receipts for 1 after save"
648679
);
649680
assert!(store
650-
.get_event_room_receipt_events(room_id, ReceiptType::Read, &second_event_id)
681+
.get_event_room_receipt_events(
682+
room_id,
683+
ReceiptType::Read,
684+
ReceiptThread::Unthreaded,
685+
&second_event_id
686+
)
651687
.await
652688
.expect("failed to read user room receipt for 2 after save")
653689
.is_empty());
@@ -657,18 +693,33 @@ macro_rules! statestore_integration_tests {
657693

658694
store.save_changes(&changes).await.expect("Saving works");
659695
assert!(store
660-
.get_user_room_receipt_event(room_id, ReceiptType::Read, user_id())
696+
.get_user_room_receipt_event(
697+
room_id,
698+
ReceiptType::Read,
699+
ReceiptThread::Unthreaded,
700+
user_id()
701+
)
661702
.await
662703
.expect("Getting user room receipts failed")
663704
.is_some());
664705
assert!(store
665-
.get_event_room_receipt_events(room_id, ReceiptType::Read, &first_event_id)
706+
.get_event_room_receipt_events(
707+
room_id,
708+
ReceiptType::Read,
709+
ReceiptThread::Unthreaded,
710+
&first_event_id
711+
)
666712
.await
667713
.expect("Getting event room receipt events for first event failed")
668714
.is_empty());
669715
assert_eq!(
670716
store
671-
.get_event_room_receipt_events(room_id, ReceiptType::Read, &second_event_id)
717+
.get_event_room_receipt_events(
718+
room_id,
719+
ReceiptType::Read,
720+
ReceiptThread::Unthreaded,
721+
&second_event_id
722+
)
672723
.await
673724
.expect("Getting event room receipt events for second event failed")
674725
.len(),
@@ -803,14 +854,20 @@ macro_rules! statestore_integration_tests {
803854
.await?
804855
.is_none());
805856
assert!(store
806-
.get_user_room_receipt_event(room_id, ReceiptType::Read, user_id)
857+
.get_user_room_receipt_event(
858+
room_id,
859+
ReceiptType::Read,
860+
ReceiptThread::Unthreaded,
861+
user_id
862+
)
807863
.await?
808864
.is_none());
809865
assert!(
810866
store
811867
.get_event_room_receipt_events(
812868
room_id,
813869
ReceiptType::Read,
870+
ReceiptThread::Unthreaded,
814871
first_receipt_event_id()
815872
)
816873
.await?

crates/matrix-sdk-base/src/store/memory_store.rs

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use ruma::{
2525
canonical_json::redact,
2626
events::{
2727
presence::PresenceEvent,
28-
receipt::{Receipt, ReceiptType},
28+
receipt::{Receipt, ReceiptThread, ReceiptType},
2929
room::member::{MembershipState, StrippedRoomMemberEvent, SyncRoomMemberEvent},
3030
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
3131
AnySyncStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
@@ -66,10 +66,17 @@ pub struct MemoryStore {
6666
stripped_joined_user_ids: Arc<DashMap<OwnedRoomId, DashSet<OwnedUserId>>>,
6767
stripped_invited_user_ids: Arc<DashMap<OwnedRoomId, DashSet<OwnedUserId>>>,
6868
presence: Arc<DashMap<OwnedUserId, Raw<PresenceEvent>>>,
69-
room_user_receipts:
70-
Arc<DashMap<OwnedRoomId, DashMap<String, DashMap<OwnedUserId, (OwnedEventId, Receipt)>>>>,
69+
room_user_receipts: Arc<
70+
DashMap<
71+
OwnedRoomId,
72+
DashMap<(String, Option<String>), DashMap<OwnedUserId, (OwnedEventId, Receipt)>>,
73+
>,
74+
>,
7175
room_event_receipts: Arc<
72-
DashMap<OwnedRoomId, DashMap<String, DashMap<OwnedEventId, DashMap<OwnedUserId, Receipt>>>>,
76+
DashMap<
77+
OwnedRoomId,
78+
DashMap<(String, Option<String>), DashMap<OwnedEventId, DashMap<OwnedUserId, Receipt>>>,
79+
>,
7380
>,
7481
custom: Arc<DashMap<Vec<u8>, Vec<u8>>>,
7582
}
@@ -317,18 +324,21 @@ impl MemoryStore {
317324
for (event_id, receipts) in &content.0 {
318325
for (receipt_type, receipts) in receipts {
319326
for (user_id, receipt) in receipts {
327+
let thread = receipt.thread.as_str().map(ToOwned::to_owned);
320328
// Add the receipt to the room user receipts
321329
if let Some((old_event, _)) = self
322330
.room_user_receipts
323331
.entry(room.clone())
324332
.or_default()
325-
.entry(receipt_type.to_string())
333+
.entry((receipt_type.to_string(), thread.clone()))
326334
.or_default()
327335
.insert(user_id.clone(), (event_id.clone(), receipt.clone()))
328336
{
329337
// Remove the old receipt from the room event receipts
330338
if let Some(receipt_map) = self.room_event_receipts.get(room) {
331-
if let Some(event_map) = receipt_map.get(receipt_type.as_ref()) {
339+
if let Some(event_map) =
340+
receipt_map.get(&(receipt_type.to_string(), thread.clone()))
341+
{
332342
if let Some(user_map) = event_map.get_mut(&old_event) {
333343
user_map.remove(user_id);
334344
}
@@ -340,7 +350,7 @@ impl MemoryStore {
340350
self.room_event_receipts
341351
.entry(room.clone())
342352
.or_default()
343-
.entry(receipt_type.to_string())
353+
.entry((receipt_type.to_string(), thread))
344354
.or_default()
345355
.entry(event_id.clone())
346356
.or_default()
@@ -509,27 +519,33 @@ impl MemoryStore {
509519
&self,
510520
room_id: &RoomId,
511521
receipt_type: ReceiptType,
522+
thread: ReceiptThread,
512523
user_id: &UserId,
513524
) -> Result<Option<(OwnedEventId, Receipt)>> {
514525
Ok(self.room_user_receipts.get(room_id).and_then(|m| {
515-
m.get(receipt_type.as_ref()).and_then(|m| m.get(user_id).map(|r| r.clone()))
526+
m.get(&(receipt_type.to_string(), thread.as_str().map(ToOwned::to_owned)))
527+
.and_then(|m| m.get(user_id).map(|r| r.clone()))
516528
}))
517529
}
518530

519531
async fn get_event_room_receipt_events(
520532
&self,
521533
room_id: &RoomId,
522534
receipt_type: ReceiptType,
535+
thread: ReceiptThread,
523536
event_id: &EventId,
524537
) -> Result<Vec<(OwnedUserId, Receipt)>> {
525538
Ok(self
526539
.room_event_receipts
527540
.get(room_id)
528541
.and_then(|m| {
529-
m.get(receipt_type.as_ref()).and_then(|m| {
530-
m.get(event_id)
531-
.map(|m| m.iter().map(|r| (r.key().clone(), r.value().clone())).collect())
532-
})
542+
m.get(&(receipt_type.to_string(), thread.as_str().map(ToOwned::to_owned))).and_then(
543+
|m| {
544+
m.get(event_id).map(|m| {
545+
m.iter().map(|r| (r.key().clone(), r.value().clone())).collect()
546+
})
547+
},
548+
)
533549
})
534550
.unwrap_or_default())
535551
}
@@ -690,18 +706,20 @@ impl StateStore for MemoryStore {
690706
&self,
691707
room_id: &RoomId,
692708
receipt_type: ReceiptType,
709+
thread: ReceiptThread,
693710
user_id: &UserId,
694711
) -> Result<Option<(OwnedEventId, Receipt)>> {
695-
self.get_user_room_receipt_event(room_id, receipt_type, user_id).await
712+
self.get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
696713
}
697714

698715
async fn get_event_room_receipt_events(
699716
&self,
700717
room_id: &RoomId,
701718
receipt_type: ReceiptType,
719+
thread: ReceiptThread,
702720
event_id: &EventId,
703721
) -> Result<Vec<(OwnedUserId, Receipt)>> {
704-
self.get_event_room_receipt_events(room_id, receipt_type, event_id).await
722+
self.get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
705723
}
706724

707725
async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {

crates/matrix-sdk-base/src/store/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use ruma::{
4848
api::client::push::get_notifications::v3::Notification,
4949
events::{
5050
presence::PresenceEvent,
51-
receipt::{Receipt, ReceiptEventContent, ReceiptType},
51+
receipt::{Receipt, ReceiptEventContent, ReceiptThread, ReceiptType},
5252
room::{
5353
member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
5454
redaction::OriginalSyncRoomRedactionEvent,
@@ -289,11 +289,14 @@ pub trait StateStore: AsyncTraitDeps {
289289
///
290290
/// * `receipt_type` - The type of the receipt.
291291
///
292+
/// * `thread` - The thread containing this receipt.
293+
///
292294
/// * `user_id` - The id of the user for who the receipt should be fetched.
293295
async fn get_user_room_receipt_event(
294296
&self,
295297
room_id: &RoomId,
296298
receipt_type: ReceiptType,
299+
thread: ReceiptThread,
297300
user_id: &UserId,
298301
) -> Result<Option<(OwnedEventId, Receipt)>>;
299302

@@ -306,12 +309,15 @@ pub trait StateStore: AsyncTraitDeps {
306309
///
307310
/// * `receipt_type` - The type of the receipts.
308311
///
312+
/// * `thread` - The thread containing this receipt.
313+
///
309314
/// * `event_id` - The id of the event for which the receipts should be
310315
/// fetched.
311316
async fn get_event_room_receipt_events(
312317
&self,
313318
room_id: &RoomId,
314319
receipt_type: ReceiptType,
320+
thread: ReceiptThread,
315321
event_id: &EventId,
316322
) -> Result<Vec<(OwnedUserId, Receipt)>>;
317323

0 commit comments

Comments
 (0)