@@ -1063,31 +1063,17 @@ mod private {
1063
1063
Ok ( Some ( ( target, related) ) )
1064
1064
}
1065
1065
1066
- /// Gives a temporary mutable handle to the underlying in-memory events,
1067
- /// and will propagate changes to the storage once done.
1068
- ///
1069
- /// Returns the updates to the linked chunk, as vector diffs, so the
1070
- /// caller may propagate such updates, if needs be.
1071
- ///
1072
- /// The function `func` takes a mutable reference to `RoomEvents`. It
1073
- /// returns a set of events that will be post-processed. At the time of
1074
- /// writing, all these events are passed to
1075
- /// `Self::maybe_apply_new_redaction`.
1076
- #[ instrument( skip_all, fields( room_id = %self . room) ) ]
1077
- async fn with_events_mut < F > (
1066
+ /// Post-process new events, after they have been added to the in-memory
1067
+ /// linked chunk.
1068
+ async fn post_process_new_events (
1078
1069
& mut self ,
1070
+ events : Vec < TimelineEvent > ,
1079
1071
is_live_sync : bool ,
1080
- func : F ,
1081
- ) -> Result < ( ) , EventCacheError >
1082
- where
1083
- F : FnOnce ( & mut RoomEvents ) -> Vec < TimelineEvent > ,
1084
- {
1085
- let events_to_post_process = func ( & mut self . events ) ;
1086
-
1072
+ ) -> Result < ( ) , EventCacheError > {
1087
1073
// Update the store before doing the post-processing.
1088
1074
self . propagate_changes ( ) . await ?;
1089
1075
1090
- for event in events_to_post_process {
1076
+ for event in events {
1091
1077
self . maybe_apply_new_redaction ( & event) . await ?;
1092
1078
1093
1079
self . analyze_thread_root ( & event, is_live_sync) . await ?;
@@ -1352,34 +1338,31 @@ mod private {
1352
1338
1353
1339
// Add the previous back-pagination token (if present), followed by the timeline
1354
1340
// events themselves.
1355
- self . with_events_mut ( true , |room_events| {
1356
- if let Some ( prev_token) = & prev_batch {
1357
- // As a tiny optimization: remove the last chunk if it's an empty event
1358
- // one, as it's not useful to keep it before a gap.
1359
- let prev_chunk_to_remove = room_events. rchunks ( ) . next ( ) . and_then ( |chunk| {
1360
- ( chunk. is_items ( ) && chunk. num_items ( ) == 0 ) . then_some ( chunk. identifier ( ) )
1361
- } ) ;
1341
+ if let Some ( prev_token) = & prev_batch {
1342
+ // As a tiny optimization: remove the last chunk if it's an empty event
1343
+ // one, as it's not useful to keep it before a gap.
1344
+ let prev_chunk_to_remove = self . events . rchunks ( ) . next ( ) . and_then ( |chunk| {
1345
+ ( chunk. is_items ( ) && chunk. num_items ( ) == 0 ) . then_some ( chunk. identifier ( ) )
1346
+ } ) ;
1362
1347
1363
- room_events . push_gap ( Gap { prev_token : prev_token. clone ( ) } ) ;
1348
+ self . events . push_gap ( Gap { prev_token : prev_token. clone ( ) } ) ;
1364
1349
1365
- if let Some ( prev_chunk_to_remove) = prev_chunk_to_remove {
1366
- room_events. remove_empty_chunk_at ( prev_chunk_to_remove) . expect (
1367
- "we just checked the chunk is there, and it's an empty item chunk" ,
1368
- ) ;
1369
- }
1350
+ // If we've never waited for an initial previous-batch token, and we've now
1351
+ // inserted a gap, no need to wait for a previous-batch token later.
1352
+ if !self . waited_for_initial_prev_token && prev_batch. is_some ( ) {
1353
+ self . waited_for_initial_prev_token = true ;
1370
1354
}
1371
1355
1372
- room_events. push_events ( events. clone ( ) ) ;
1356
+ if let Some ( prev_chunk_to_remove) = prev_chunk_to_remove {
1357
+ self . events
1358
+ . remove_empty_chunk_at ( prev_chunk_to_remove)
1359
+ . expect ( "we just checked the chunk is there, and it's an empty item chunk" ) ;
1360
+ }
1361
+ }
1373
1362
1374
- events
1375
- } )
1376
- . await ?;
1363
+ self . events . push_events ( events. clone ( ) ) ;
1377
1364
1378
- // If we've never waited for an initial previous-batch token, and we've now
1379
- // inserted a gap, no need to wait for a previous-batch token later.
1380
- if !self . waited_for_initial_prev_token && prev_batch. is_some ( ) {
1381
- self . waited_for_initial_prev_token = true ;
1382
- }
1365
+ self . post_process_new_events ( events, true ) . await ?;
1383
1366
1384
1367
if timeline. limited && prev_batch. is_some ( ) {
1385
1368
// If there was a previous batch token for a limited timeline, unload the chunks
@@ -1447,67 +1430,64 @@ mod private {
1447
1430
new_gap = None ;
1448
1431
} ;
1449
1432
1450
- self . with_events_mut ( false , |room_events| {
1451
- // Reverse the order of the events as `/messages` has been called with `dir=b`
1452
- // (backwards). The `RoomEvents` API expects the first event to be the oldest.
1453
- // Let's re-order them for this block.
1454
- let reversed_events = events. iter ( ) . rev ( ) . cloned ( ) . collect :: < Vec < _ > > ( ) ;
1433
+ // Reverse the order of the events as `/messages` has been called with `dir=b`
1434
+ // (backwards). The `RoomEvents` API expects the first event to be the oldest.
1435
+ // Let's re-order them for this block.
1436
+ let reversed_events = events. iter ( ) . rev ( ) . cloned ( ) . collect :: < Vec < _ > > ( ) ;
1455
1437
1456
- let first_event_pos = room_events . events ( ) . next ( ) . map ( |( item_pos, _) | item_pos) ;
1438
+ let first_event_pos = self . events . events ( ) . next ( ) . map ( |( item_pos, _) | item_pos) ;
1457
1439
1458
- // First, insert events.
1459
- let insert_new_gap_pos = if let Some ( gap_id) = prev_gap_id {
1460
- // There is a prior gap, let's replace it by new events!
1461
- if all_duplicates {
1462
- assert ! ( reversed_events. is_empty( ) ) ;
1463
- }
1440
+ // First, insert events.
1441
+ let insert_new_gap_pos = if let Some ( gap_id) = prev_gap_id {
1442
+ // There is a prior gap, let's replace it by new events!
1443
+ if all_duplicates {
1444
+ assert ! ( reversed_events. is_empty( ) ) ;
1445
+ }
1464
1446
1465
- trace ! ( "replacing previous gap with the back-paginated events" ) ;
1447
+ trace ! ( "replacing previous gap with the back-paginated events" ) ;
1466
1448
1467
- // Replace the gap with the events we just deduplicated. This might get rid of
1468
- // the underlying gap, if the conditions are favorable to
1469
- // us.
1470
- room_events
1471
- . replace_gap_at ( reversed_events. clone ( ) , gap_id)
1472
- . expect ( "gap_identifier is a valid chunk id we read previously" )
1473
- } else if let Some ( pos) = first_event_pos {
1474
- // No prior gap, but we had some events: assume we need to prepend events
1475
- // before those.
1476
- trace ! ( "inserted events before the first known event" ) ;
1449
+ // Replace the gap with the events we just deduplicated. This might get rid of
1450
+ // the underlying gap, if the conditions are favorable to
1451
+ // us.
1452
+ self . events
1453
+ . replace_gap_at ( reversed_events. clone ( ) , gap_id)
1454
+ . expect ( "gap_identifier is a valid chunk id we read previously" )
1455
+ } else if let Some ( pos) = first_event_pos {
1456
+ // No prior gap, but we had some events: assume we need to prepend events
1457
+ // before those.
1458
+ trace ! ( "inserted events before the first known event" ) ;
1477
1459
1478
- room_events
1479
- . insert_events_at ( reversed_events. clone ( ) , pos)
1480
- . expect ( "pos is a valid position we just read above" ) ;
1460
+ self . events
1461
+ . insert_events_at ( reversed_events. clone ( ) , pos)
1462
+ . expect ( "pos is a valid position we just read above" ) ;
1481
1463
1482
- Some ( pos)
1483
- } else {
1484
- // No prior gap, and no prior events: push the events.
1485
- trace ! ( "pushing events received from back-pagination" ) ;
1464
+ Some ( pos)
1465
+ } else {
1466
+ // No prior gap, and no prior events: push the events.
1467
+ trace ! ( "pushing events received from back-pagination" ) ;
1486
1468
1487
- room_events . push_events ( reversed_events. clone ( ) ) ;
1469
+ self . events . push_events ( reversed_events. clone ( ) ) ;
1488
1470
1489
- // A new gap may be inserted before the new events, if there are any.
1490
- room_events . events ( ) . next ( ) . map ( |( item_pos, _) | item_pos)
1491
- } ;
1471
+ // A new gap may be inserted before the new events, if there are any.
1472
+ self . events . events ( ) . next ( ) . map ( |( item_pos, _) | item_pos)
1473
+ } ;
1492
1474
1493
- // And insert the new gap if needs be.
1494
- //
1495
- // We only do this when at least one new, non-duplicated event, has been added
1496
- // to the chunk. Otherwise it means we've back-paginated all the
1497
- // known events.
1498
- if let Some ( new_gap) = new_gap {
1499
- if let Some ( new_pos) = insert_new_gap_pos {
1500
- room_events
1501
- . insert_gap_at ( new_gap, new_pos)
1502
- . expect ( "events_chunk_pos represents a valid chunk position" ) ;
1503
- } else {
1504
- room_events. push_gap ( new_gap) ;
1505
- }
1475
+ // And insert the new gap if needs be.
1476
+ //
1477
+ // We only do this when at least one new, non-duplicated event, has been added
1478
+ // to the chunk. Otherwise it means we've back-paginated all the
1479
+ // known events.
1480
+ if let Some ( new_gap) = new_gap {
1481
+ if let Some ( new_pos) = insert_new_gap_pos {
1482
+ self . events
1483
+ . insert_gap_at ( new_gap, new_pos)
1484
+ . expect ( "events_chunk_pos represents a valid chunk position" ) ;
1485
+ } else {
1486
+ self . events . push_gap ( new_gap) ;
1506
1487
}
1488
+ }
1507
1489
1508
- reversed_events
1509
- } )
1510
- . await ?;
1490
+ self . post_process_new_events ( reversed_events, false ) . await ?;
1511
1491
1512
1492
// There could be an inconsistency between the network (which thinks we hit the
1513
1493
// start of the timeline) and the disk (which has the initial empty
0 commit comments