@@ -387,27 +387,32 @@ impl RoomEventCacheInner {
387
387
// Add all the events to the backend.
388
388
trace ! ( "adding new events" ) ;
389
389
390
- let stored_prev_batch_token =
391
- self . state . write ( ) . await . handle_sync ( timeline, & self . sender ) . await ?;
390
+ let ( stored_prev_batch_token, timeline_event_diffs ) =
391
+ self . state . write ( ) . await . handle_sync ( timeline) . await ?;
392
392
393
393
// Now that all events have been added, we can trigger the
394
394
// `pagination_token_notifier`.
395
395
if stored_prev_batch_token {
396
396
self . pagination_batch_token_notifier . notify_one ( ) ;
397
397
}
398
398
399
- // State sent the timeline diff updates, so we can send updates about the
400
- // related events now .
401
- {
402
- if !ephemeral_events . is_empty ( ) {
403
- let _ = self
404
- . sender
405
- . send ( RoomEventCacheUpdate :: AddEphemeralEvents { events : ephemeral_events } ) ;
406
- }
399
+ // The order matters here: first send the timeline event diffs, then only the
400
+ // related events (read receipts, etc.) .
401
+ if !timeline_event_diffs . is_empty ( ) {
402
+ let _ = self . sender . send ( RoomEventCacheUpdate :: UpdateTimelineEvents {
403
+ diffs : timeline_event_diffs ,
404
+ origin : EventsOrigin :: Sync ,
405
+ } ) ;
406
+ }
407
407
408
- if !ambiguity_changes. is_empty ( ) {
409
- let _ = self . sender . send ( RoomEventCacheUpdate :: UpdateMembers { ambiguity_changes } ) ;
410
- }
408
+ if !ephemeral_events. is_empty ( ) {
409
+ let _ = self
410
+ . sender
411
+ . send ( RoomEventCacheUpdate :: AddEphemeralEvents { events : ephemeral_events } ) ;
412
+ }
413
+
414
+ if !ambiguity_changes. is_empty ( ) {
415
+ let _ = self . sender . send ( RoomEventCacheUpdate :: UpdateMembers { ambiguity_changes } ) ;
411
416
}
412
417
413
418
Ok ( ( ) )
@@ -470,7 +475,6 @@ mod private {
470
475
serde:: Raw ,
471
476
EventId , OwnedEventId , OwnedRoomId , RoomVersionId ,
472
477
} ;
473
- use tokio:: sync:: broadcast:: Sender ;
474
478
use tracing:: { debug, error, instrument, trace, warn} ;
475
479
476
480
use super :: {
@@ -479,8 +483,7 @@ mod private {
479
483
sort_positions_descending, EventLocation , LoadMoreEventsBackwardsOutcome ,
480
484
} ;
481
485
use crate :: event_cache:: {
482
- deduplicator:: filter_duplicate_events, BackPaginationOutcome , EventsOrigin ,
483
- RoomEventCacheUpdate , RoomPaginationStatus ,
486
+ deduplicator:: filter_duplicate_events, BackPaginationOutcome , RoomPaginationStatus ,
484
487
} ;
485
488
486
489
/// State for a single room's event cache.
@@ -776,7 +779,7 @@ mod private {
776
779
777
780
/// Automatically shrink the room if there are no listeners, as
778
781
/// indicated by the atomic number of active listeners.
779
- #[ must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`" ]
782
+ #[ must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`" ]
780
783
pub ( crate ) async fn auto_shrink_if_no_listeners (
781
784
& mut self ,
782
785
) -> Result < Option < Vec < VectorDiff < TimelineEvent > > > , EventCacheError > {
@@ -952,7 +955,7 @@ mod private {
952
955
/// Return a single diff update that is a clear of all events; as a
953
956
/// result, the caller may override any pending diff updates
954
957
/// with the result of this function.
955
- #[ must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`" ]
958
+ #[ must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`" ]
956
959
pub async fn reset ( & mut self ) -> Result < Vec < VectorDiff < TimelineEvent > > , EventCacheError > {
957
960
self . reset_internal ( ) . await ?;
958
961
@@ -1292,11 +1295,11 @@ mod private {
1292
1295
///
1293
1296
/// Returns true if a new gap (previous-batch token) has been inserted,
1294
1297
/// false otherwise.
1298
+ #[ must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`" ]
1295
1299
pub async fn handle_sync (
1296
1300
& mut self ,
1297
1301
mut timeline : Timeline ,
1298
- sender : & Sender < RoomEventCacheUpdate > ,
1299
- ) -> Result < bool , EventCacheError > {
1302
+ ) -> Result < ( bool , Vec < VectorDiff < TimelineEvent > > ) , EventCacheError > {
1300
1303
let mut prev_batch = timeline. prev_batch . take ( ) ;
1301
1304
1302
1305
let (
@@ -1327,7 +1330,7 @@ mod private {
1327
1330
if all_duplicates {
1328
1331
// No new events and no gap (per the previous check), thus no need to change the
1329
1332
// room state. We're done!
1330
- return Ok ( false ) ;
1333
+ return Ok ( ( false , Vec :: new ( ) ) ) ;
1331
1334
}
1332
1335
1333
1336
// Remove the old duplicated events.
@@ -1376,23 +1379,18 @@ mod private {
1376
1379
}
1377
1380
1378
1381
let timeline_event_diffs = self . events . updates_as_vector_diffs ( ) ;
1379
- if !timeline_event_diffs. is_empty ( ) {
1380
- let _ = sender. send ( RoomEventCacheUpdate :: UpdateTimelineEvents {
1381
- diffs : timeline_event_diffs,
1382
- origin : EventsOrigin :: Sync ,
1383
- } ) ;
1384
- }
1385
1382
1386
- Ok ( prev_batch. is_some ( ) )
1383
+ Ok ( ( prev_batch. is_some ( ) , timeline_event_diffs ) )
1387
1384
}
1388
1385
1386
+ #[ must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`" ]
1389
1387
pub async fn handle_backpagination (
1390
1388
& mut self ,
1391
1389
events : Vec < TimelineEvent > ,
1392
1390
mut new_gap : Option < Gap > ,
1393
1391
prev_gap_id : Option < ChunkIdentifier > ,
1394
- sender : & Sender < RoomEventCacheUpdate > ,
1395
- ) -> Result < BackPaginationOutcome , EventCacheError > {
1392
+ ) -> Result < ( BackPaginationOutcome , Vec < VectorDiff < TimelineEvent > > ) , EventCacheError >
1393
+ {
1396
1394
// If there's no new gap (previous batch token), then we've reached the start of
1397
1395
// the timeline.
1398
1396
let network_reached_start = new_gap. is_none ( ) ;
@@ -1516,17 +1514,10 @@ mod private {
1516
1514
reached_start
1517
1515
} ;
1518
1516
1519
- let backpagination_outcome = BackPaginationOutcome { events, reached_start } ;
1520
-
1521
1517
let event_diffs = self . events . updates_as_vector_diffs ( ) ;
1522
- if !event_diffs. is_empty ( ) {
1523
- let _ = sender. send ( RoomEventCacheUpdate :: UpdateTimelineEvents {
1524
- diffs : event_diffs,
1525
- origin : EventsOrigin :: Pagination ,
1526
- } ) ;
1527
- }
1518
+ let backpagination_outcome = BackPaginationOutcome { events, reached_start } ;
1528
1519
1529
- Ok ( backpagination_outcome)
1520
+ Ok ( ( backpagination_outcome, event_diffs ) )
1530
1521
}
1531
1522
}
1532
1523
}
0 commit comments