@@ -594,7 +594,7 @@ mod private {
594
594
/// possibly misplace them. And we should not be missing
595
595
/// events either: the already-known events would have their own
596
596
/// previous-batch token (it might already be consumed).
597
- pub async fn collect_valid_and_duplicated_events (
597
+ async fn collect_valid_and_duplicated_events (
598
598
& mut self ,
599
599
events : Vec < Event > ,
600
600
) -> Result < ( DeduplicationOutcome , bool ) , EventCacheError > {
@@ -850,13 +850,12 @@ mod private {
850
850
///
851
851
/// This method is purposely isolated because it must ensure that
852
852
/// positions are sorted appropriately or it can be disastrous.
853
- #[ must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`" ]
854
853
#[ instrument( skip_all) ]
855
- pub ( crate ) async fn remove_events (
854
+ async fn remove_events (
856
855
& mut self ,
857
856
in_memory_events : Vec < ( OwnedEventId , Position ) > ,
858
857
in_store_events : Vec < ( OwnedEventId , Position ) > ,
859
- ) -> Result < Vec < VectorDiff < TimelineEvent > > , EventCacheError > {
858
+ ) -> Result < ( ) , EventCacheError > {
860
859
// In-store events.
861
860
if !in_store_events. is_empty ( ) {
862
861
let mut positions = in_store_events
@@ -878,7 +877,7 @@ mod private {
878
877
// In-memory events.
879
878
if in_memory_events. is_empty ( ) {
880
879
// Nothing else to do, return early.
881
- return Ok ( Vec :: new ( ) ) ;
880
+ return Ok ( ( ) ) ;
882
881
}
883
882
884
883
// `remove_events_by_position` is responsible of sorting positions.
@@ -888,9 +887,7 @@ mod private {
888
887
)
889
888
. expect ( "failed to remove an event" ) ;
890
889
891
- self . propagate_changes ( ) . await ?;
892
-
893
- Ok ( self . events . updates_as_vector_diffs ( ) )
890
+ self . propagate_changes ( ) . await
894
891
}
895
892
896
893
/// Propagate changes to the underlying storage.
@@ -899,7 +896,7 @@ mod private {
899
896
self . send_updates_to_store ( updates) . await
900
897
}
901
898
902
- pub async fn send_updates_to_store (
899
+ async fn send_updates_to_store (
903
900
& mut self ,
904
901
mut updates : Vec < Update < TimelineEvent , Gap > > ,
905
902
) -> Result < ( ) , EventCacheError > {
@@ -991,7 +988,7 @@ mod private {
991
988
) -> Result < Option < ( EventLocation , TimelineEvent ) > , EventCacheError > {
992
989
// There are supposedly fewer events loaded in memory than in the store. Let's
993
990
// start by looking up in the `RoomEvents`.
994
- for ( position, event) in self . events ( ) . revents ( ) {
991
+ for ( position, event) in self . events . revents ( ) {
995
992
if event. event_id ( ) . as_deref ( ) == Some ( event_id) {
996
993
return Ok ( Some ( ( EventLocation :: Memory ( position) , event. clone ( ) ) ) ) ;
997
994
}
@@ -1069,13 +1066,12 @@ mod private {
1069
1066
/// returns a set of events that will be post-processed. At the time of
1070
1067
/// writing, all these events are passed to
1071
1068
/// `Self::maybe_apply_new_redaction`.
1072
- #[ must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`" ]
1073
1069
#[ instrument( skip_all, fields( room_id = %self . room) ) ]
1074
- pub async fn with_events_mut < F > (
1070
+ async fn with_events_mut < F > (
1075
1071
& mut self ,
1076
1072
is_live_sync : bool ,
1077
1073
func : F ,
1078
- ) -> Result < Vec < VectorDiff < TimelineEvent > > , EventCacheError >
1074
+ ) -> Result < ( ) , EventCacheError >
1079
1075
where
1080
1076
F : FnOnce ( & mut RoomEvents ) -> Vec < TimelineEvent > ,
1081
1077
{
@@ -1103,9 +1099,7 @@ mod private {
1103
1099
self . waited_for_initial_prev_token = true ;
1104
1100
}
1105
1101
1106
- let updates_as_vector_diffs = self . events . updates_as_vector_diffs ( ) ;
1107
-
1108
- Ok ( updates_as_vector_diffs)
1102
+ Ok ( ( ) )
1109
1103
}
1110
1104
1111
1105
/// If the event is a threaded reply, ensure the related thread's root
@@ -1350,52 +1344,43 @@ mod private {
1350
1344
return Ok ( false ) ;
1351
1345
}
1352
1346
1353
- // During a sync, when a duplicated event is found, the old event is removed and
1354
- // the new event is added.
1355
- //
1356
- // Let's remove the old events that are duplicated.
1357
-
1358
1347
// Remove the old duplicated events.
1359
1348
//
1360
- // We don't have to worry the removals can change the position of the
1361
- // existing events, because we are pushing all _new_
1362
- // `events` at the back.
1363
- let mut timeline_event_diffs = self
1364
- . remove_events ( in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1349
+ // We don't have to worry the removals can change the position of the existing
1350
+ // events, because we are pushing all _new_ `events` at the back.
1351
+ self . remove_events ( in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1365
1352
. await ?;
1366
1353
1367
1354
// Add the previous back-pagination token (if present), followed by the timeline
1368
1355
// events themselves.
1369
- let new_timeline_event_diffs = self
1370
- . with_events_mut ( true , |room_events| {
1371
- // If we only received duplicated events, we don't need to store the gap: if
1372
- // there was a gap, we'd have received an unknown event at the tail of
1373
- // the room's timeline (unless the server reordered sync events since the
1374
- // last time we sync'd).
1375
- if let Some ( prev_token) = & prev_batch {
1376
- // As a tiny optimization: remove the last chunk if it's an empty event
1377
- // one, as it's not useful to keep it before a gap.
1378
- let prev_chunk_to_remove = room_events. rchunks ( ) . next ( ) . and_then ( |chunk| {
1379
- ( chunk. is_items ( ) && chunk. num_items ( ) == 0 )
1380
- . then_some ( chunk. identifier ( ) )
1381
- } ) ;
1382
-
1383
- room_events. push_gap ( Gap { prev_token : prev_token. clone ( ) } ) ;
1384
-
1385
- if let Some ( prev_chunk_to_remove) = prev_chunk_to_remove {
1386
- room_events. remove_empty_chunk_at ( prev_chunk_to_remove) . expect (
1387
- "we just checked the chunk is there, and it's an empty item chunk" ,
1388
- ) ;
1389
- }
1356
+ self . with_events_mut ( true , |room_events| {
1357
+ // If we only received duplicated events, we don't need to store the gap: if
1358
+ // there was a gap, we'd have received an unknown event at the tail of
1359
+ // the room's timeline (unless the server reordered sync events since the
1360
+ // last time we sync'd).
1361
+ if let Some ( prev_token) = & prev_batch {
1362
+ // As a tiny optimization: remove the last chunk if it's an empty event
1363
+ // one, as it's not useful to keep it before a gap.
1364
+ let prev_chunk_to_remove = room_events. rchunks ( ) . next ( ) . and_then ( |chunk| {
1365
+ ( chunk. is_items ( ) && chunk. num_items ( ) == 0 ) . then_some ( chunk. identifier ( ) )
1366
+ } ) ;
1367
+
1368
+ room_events. push_gap ( Gap { prev_token : prev_token. clone ( ) } ) ;
1369
+
1370
+ if let Some ( prev_chunk_to_remove) = prev_chunk_to_remove {
1371
+ room_events. remove_empty_chunk_at ( prev_chunk_to_remove) . expect (
1372
+ "we just checked the chunk is there, and it's an empty item chunk" ,
1373
+ ) ;
1390
1374
}
1375
+ }
1391
1376
1392
- room_events. push_events ( events. clone ( ) ) ;
1377
+ room_events. push_events ( events. clone ( ) ) ;
1393
1378
1394
- events. clone ( )
1395
- } )
1396
- . await ?;
1379
+ events
1380
+ } )
1381
+ . await ?;
1397
1382
1398
- timeline_event_diffs. extend ( new_timeline_event_diffs ) ;
1383
+ let mut timeline_event_diffs = None ;
1399
1384
1400
1385
if timeline. limited && prev_batch. is_some ( ) {
1401
1386
// If there was a previous batch token for a limited timeline, unload the chunks
@@ -1404,13 +1389,14 @@ mod private {
1404
1389
//
1405
1390
// We must do this *after* the above call to `.with_events_mut`, so the new
1406
1391
// events and gaps are properly persisted to storage.
1407
- if let Some ( diffs) = self . shrink_to_last_chunk ( ) . await ? {
1408
- // Override the diffs with the new ones, as per `shrink_to_last_chunk`'s API
1409
- // contract.
1410
- timeline_event_diffs = diffs;
1411
- }
1392
+
1393
+ // TODO(bnjbvr): could this method not return diff updates?
1394
+ timeline_event_diffs = self . shrink_to_last_chunk ( ) . await ?;
1412
1395
}
1413
1396
1397
+ let timeline_event_diffs =
1398
+ timeline_event_diffs. unwrap_or_else ( || self . events . updates_as_vector_diffs ( ) ) ;
1399
+
1414
1400
if !timeline_event_diffs. is_empty ( ) {
1415
1401
let _ = sender. send ( RoomEventCacheUpdate :: UpdateTimelineEvents {
1416
1402
diffs : timeline_event_diffs,
@@ -1454,85 +1440,81 @@ mod private {
1454
1440
// ordering of [D, E, F, A, B, C], which is incorrect. So we do have to remove
1455
1441
// all the events, in case this happens (see also #4746).
1456
1442
1457
- let mut event_diffs = if !all_duplicates {
1443
+ if !all_duplicates {
1458
1444
// Let's forget all the previous events.
1459
1445
self . remove_events ( in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1460
- . await ?
1446
+ . await ?;
1461
1447
} else {
1462
1448
// All new events are duplicated, they can all be ignored.
1463
1449
events. clear ( ) ;
1464
- Default :: default ( )
1465
1450
} ;
1466
1451
1467
- let next_diffs = self
1468
- . with_events_mut ( false , |room_events| {
1469
- // Reverse the order of the events as `/messages` has been called with `dir=b`
1470
- // (backwards). The `RoomEvents` API expects the first event to be the oldest.
1471
- // Let's re-order them for this block.
1472
- let reversed_events = events
1473
- . iter ( )
1474
- . rev ( )
1475
- . cloned ( )
1476
- . collect :: < Vec < _ > > ( ) ;
1477
-
1478
- let first_event_pos = room_events. events ( ) . next ( ) . map ( |( item_pos, _) | item_pos) ;
1479
-
1480
- // First, insert events.
1481
- let insert_new_gap_pos = if let Some ( gap_id) = prev_gap_id {
1482
- // There is a prior gap, let's replace it by new events!
1483
- if all_duplicates {
1484
- assert ! ( reversed_events. is_empty( ) ) ;
1485
- }
1452
+ self . with_events_mut ( false , |room_events| {
1453
+ // Reverse the order of the events as `/messages` has been called with `dir=b`
1454
+ // (backwards). The `RoomEvents` API expects the first event to be the oldest.
1455
+ // Let's re-order them for this block.
1456
+ let reversed_events = events
1457
+ . iter ( )
1458
+ . rev ( )
1459
+ . cloned ( )
1460
+ . collect :: < Vec < _ > > ( ) ;
1461
+
1462
+ let first_event_pos = room_events. events ( ) . next ( ) . map ( |( item_pos, _) | item_pos) ;
1486
1463
1487
- trace ! ( "replacing previous gap with the back-paginated events" ) ;
1464
+ // First, insert events.
1465
+ let insert_new_gap_pos = if let Some ( gap_id) = prev_gap_id {
1466
+ // There is a prior gap, let's replace it by new events!
1467
+ if all_duplicates {
1468
+ assert ! ( reversed_events. is_empty( ) ) ;
1469
+ }
1488
1470
1489
- // Replace the gap with the events we just deduplicated. This might get rid of the
1490
- // underlying gap, if the conditions are favorable to us.
1491
- room_events. replace_gap_at ( reversed_events. clone ( ) , gap_id)
1492
- . expect ( "gap_identifier is a valid chunk id we read previously" )
1493
- } else if let Some ( pos) = first_event_pos {
1494
- // No prior gap, but we had some events: assume we need to prepend events
1495
- // before those.
1496
- trace ! ( "inserted events before the first known event" ) ;
1471
+ trace ! ( "replacing previous gap with the back-paginated events" ) ;
1497
1472
1498
- room_events
1499
- . insert_events_at ( reversed_events. clone ( ) , pos)
1500
- . expect ( "pos is a valid position we just read above" ) ;
1473
+ // Replace the gap with the events we just deduplicated. This might get rid of the
1474
+ // underlying gap, if the conditions are favorable to us.
1475
+ room_events. replace_gap_at ( reversed_events. clone ( ) , gap_id)
1476
+ . expect ( "gap_identifier is a valid chunk id we read previously" )
1477
+ } else if let Some ( pos) = first_event_pos {
1478
+ // No prior gap, but we had some events: assume we need to prepend events
1479
+ // before those.
1480
+ trace ! ( "inserted events before the first known event" ) ;
1501
1481
1502
- Some ( pos)
1503
- } else {
1504
- // No prior gap, and no prior events: push the events.
1505
- trace ! ( "pushing events received from back-pagination" ) ;
1482
+ room_events
1483
+ . insert_events_at ( reversed_events. clone ( ) , pos)
1484
+ . expect ( "pos is a valid position we just read above" ) ;
1506
1485
1507
- room_events. push_events ( reversed_events. clone ( ) ) ;
1486
+ Some ( pos)
1487
+ } else {
1488
+ // No prior gap, and no prior events: push the events.
1489
+ trace ! ( "pushing events received from back-pagination" ) ;
1508
1490
1509
- // A new gap may be inserted before the new events, if there are any.
1510
- room_events. events ( ) . next ( ) . map ( |( item_pos, _) | item_pos)
1511
- } ;
1491
+ room_events. push_events ( reversed_events. clone ( ) ) ;
1512
1492
1513
- // And insert the new gap if needs be.
1514
- //
1515
- // We only do this when at least one new, non-duplicated event, has been added to
1516
- // the chunk. Otherwise it means we've back-paginated all the known events.
1517
- if !all_duplicates {
1518
- if let Some ( new_gap) = new_gap {
1519
- if let Some ( new_pos) = insert_new_gap_pos {
1520
- room_events
1521
- . insert_gap_at ( new_gap, new_pos)
1522
- . expect ( "events_chunk_pos represents a valid chunk position" ) ;
1523
- } else {
1524
- room_events. push_gap ( new_gap) ;
1493
+ // A new gap may be inserted before the new events, if there are any.
1494
+ room_events. events ( ) . next ( ) . map ( |( item_pos, _) | item_pos)
1495
+ } ;
1496
+
1497
+ // And insert the new gap if needs be.
1498
+ //
1499
+ // We only do this when at least one new, non-duplicated event, has been added to
1500
+ // the chunk. Otherwise it means we've back-paginated all the known events.
1501
+ if !all_duplicates {
1502
+ if let Some ( new_gap) = new_gap {
1503
+ if let Some ( new_pos) = insert_new_gap_pos {
1504
+ room_events
1505
+ . insert_gap_at ( new_gap, new_pos)
1506
+ . expect ( "events_chunk_pos represents a valid chunk position" ) ;
1507
+ } else {
1508
+ room_events. push_gap ( new_gap) ;
1509
+ }
1525
1510
}
1511
+ } else {
1512
+ debug ! ( "not storing previous batch token, because we deduplicated all new back-paginated events" ) ;
1526
1513
}
1527
- } else {
1528
- debug ! ( "not storing previous batch token, because we deduplicated all new back-paginated events" ) ;
1529
- }
1530
-
1531
- reversed_events
1532
- } )
1533
- . await ?;
1534
1514
1535
- event_diffs. extend ( next_diffs) ;
1515
+ reversed_events
1516
+ } )
1517
+ . await ?;
1536
1518
1537
1519
// There could be an inconsistency between the network (which thinks we hit the
1538
1520
// start of the timeline) and the disk (which has the initial empty
@@ -1562,6 +1544,7 @@ mod private {
1562
1544
1563
1545
let backpagination_outcome = BackPaginationOutcome { events, reached_start } ;
1564
1546
1547
+ let event_diffs = self . events . updates_as_vector_diffs ( ) ;
1565
1548
if !event_diffs. is_empty ( ) {
1566
1549
let _ = sender. send ( RoomEventCacheUpdate :: UpdateTimelineEvents {
1567
1550
diffs : event_diffs,
0 commit comments