@@ -24,7 +24,7 @@ use std::{
24
24
} ,
25
25
} ;
26
26
27
- use events:: { sort_positions_descending, Gap } ;
27
+ use events:: sort_positions_descending;
28
28
use eyeball:: SharedObservable ;
29
29
use eyeball_im:: VectorDiff ;
30
30
use matrix_sdk_base:: {
@@ -44,8 +44,8 @@ use tokio::sync::{
44
44
use tracing:: { instrument, trace, warn} ;
45
45
46
46
use super :: {
47
- deduplicator :: DeduplicationOutcome , AutoShrinkChannelPayload , EventsOrigin , Result ,
48
- RoomEventCacheUpdate , RoomPagination , RoomPaginationStatus ,
47
+ AutoShrinkChannelPayload , EventsOrigin , Result , RoomEventCacheUpdate , RoomPagination ,
48
+ RoomPaginationStatus ,
49
49
} ;
50
50
use crate :: { client:: WeakClient , room:: WeakRoom } ;
51
51
@@ -376,9 +376,8 @@ impl RoomEventCacheInner {
376
376
ephemeral_events : Vec < Raw < AnySyncEphemeralRoomEvent > > ,
377
377
ambiguity_changes : BTreeMap < OwnedEventId , AmbiguityChange > ,
378
378
) -> Result < ( ) > {
379
- let mut prev_batch = timeline. prev_batch ;
380
379
if timeline. events . is_empty ( )
381
- && prev_batch. is_none ( )
380
+ && timeline . prev_batch . is_none ( )
382
381
&& ephemeral_events. is_empty ( )
383
382
&& ambiguity_changes. is_empty ( )
384
383
{
@@ -388,115 +387,18 @@ impl RoomEventCacheInner {
388
387
// Add all the events to the backend.
389
388
trace ! ( "adding new events" ) ;
390
389
391
- let mut state = self . state . write ( ) . await ;
392
-
393
- let (
394
- DeduplicationOutcome {
395
- all_events : events,
396
- in_memory_duplicated_event_ids,
397
- in_store_duplicated_event_ids,
398
- } ,
399
- all_duplicates,
400
- ) = state. collect_valid_and_duplicated_events ( timeline. events ) . await ?;
401
-
402
- // If the timeline isn't limited, and we already knew about some past events,
403
- // then this definitely know what the timeline head is (either we know
404
- // about all the events persisted in storage, or we have a gap
405
- // somewhere). In this case, we can ditch the previous-batch
406
- // token, which is an optimization to avoid unnecessary future back-pagination
407
- // requests.
408
- //
409
- // We can also ditch it, if we knew about all the events that came from sync,
410
- // viz. they were all deduplicated. In this case, using the
411
- // previous-batch token would only result in fetching other events we
412
- // knew about. This is slightly incorrect in the presence of
413
- // network splits, but this has shown to be Good Enough™.
414
- if !timeline. limited && state. events ( ) . events ( ) . next ( ) . is_some ( ) || all_duplicates {
415
- prev_batch = None ;
416
- }
417
-
418
- // During a sync, when a duplicated event is found, the old event is removed and
419
- // the new event is added.
420
- //
421
- // Let's remove the old events that are duplicated.
422
- let timeline_event_diffs = if all_duplicates {
423
- // No new events, thus no need to change the room events.
424
- vec ! [ ]
425
- } else {
426
- // Remove the old duplicated events.
427
- //
428
- // We don't have to worry the removals can change the position of the
429
- // existing events, because we are pushing all _new_
430
- // `events` at the back.
431
- let mut timeline_event_diffs = state
432
- . remove_events ( in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
433
- . await ?;
434
-
435
- // Add the previous back-pagination token (if present), followed by the timeline
436
- // events themselves.
437
- let new_timeline_event_diffs = state
438
- . with_events_mut ( true , |room_events| {
439
- // If we only received duplicated events, we don't need to store the gap: if
440
- // there was a gap, we'd have received an unknown event at the tail of
441
- // the room's timeline (unless the server reordered sync events since the last
442
- // time we sync'd).
443
- if let Some ( prev_token) = & prev_batch {
444
- // As a tiny optimization: remove the last chunk if it's an empty event
445
- // one, as it's not useful to keep it before a gap.
446
- let prev_chunk_to_remove = room_events. rchunks ( ) . next ( ) . and_then ( |chunk| {
447
- ( chunk. is_items ( ) && chunk. num_items ( ) == 0 )
448
- . then_some ( chunk. identifier ( ) )
449
- } ) ;
450
-
451
- room_events. push_gap ( Gap { prev_token : prev_token. clone ( ) } ) ;
452
-
453
- if let Some ( prev_chunk_to_remove) = prev_chunk_to_remove {
454
- room_events. remove_empty_chunk_at ( prev_chunk_to_remove) . expect (
455
- "we just checked the chunk is there, and it's an empty item chunk" ,
456
- ) ;
457
- }
458
- }
459
-
460
- room_events. push_events ( events. clone ( ) ) ;
461
-
462
- events. clone ( )
463
- } )
464
- . await ?;
465
-
466
- timeline_event_diffs. extend ( new_timeline_event_diffs) ;
467
-
468
- if timeline. limited && prev_batch. is_some ( ) {
469
- // If there was a previous batch token for a limited timeline, unload the chunks
470
- // so it only contains the last one; otherwise, there might be a
471
- // valid gap in between, and observers may not render it (yet).
472
- //
473
- // We must do this *after* the above call to `.with_events_mut`, so the new
474
- // events and gaps are properly persisted to storage.
475
- if let Some ( diffs) = state. shrink_to_last_chunk ( ) . await ? {
476
- // Override the diffs with the new ones, as per `shrink_to_last_chunk`'s API
477
- // contract.
478
- timeline_event_diffs = diffs;
479
- }
480
- }
481
-
482
- timeline_event_diffs
483
- } ;
390
+ let stored_prev_batch_token =
391
+ self . state . write ( ) . await . handle_sync ( timeline, & self . sender ) . await ?;
484
392
485
393
// Now that all events have been added, we can trigger the
486
394
// `pagination_token_notifier`.
487
- if prev_batch . is_some ( ) {
395
+ if stored_prev_batch_token {
488
396
self . pagination_batch_token_notifier . notify_one ( ) ;
489
397
}
490
398
491
- // The order of `RoomEventCacheUpdate`s is **really** important here.
399
+ // State sent the timeline diff updates, so we can send updates about the
400
+ // related events now.
492
401
{
493
- if !timeline_event_diffs. is_empty ( ) {
494
- let _ = self . sender . send ( RoomEventCacheUpdate :: UpdateTimelineEvents {
495
- diffs : timeline_event_diffs,
496
- origin : EventsOrigin :: Sync ,
497
- } ) ;
498
- }
499
-
500
402
if !ephemeral_events. is_empty ( ) {
501
403
let _ = self
502
404
. sender
@@ -556,6 +458,7 @@ mod private {
556
458
lazy_loader, ChunkContent , ChunkIdentifierGenerator , LinkedChunkId , Position , Update ,
557
459
} ,
558
460
serde_helpers:: extract_thread_root,
461
+ sync:: Timeline ,
559
462
} ;
560
463
use matrix_sdk_common:: executor:: spawn;
561
464
use ruma:: {
@@ -566,14 +469,17 @@ mod private {
566
469
serde:: Raw ,
567
470
EventId , OwnedEventId , OwnedRoomId , RoomVersionId ,
568
471
} ;
472
+ use tokio:: sync:: broadcast:: Sender ;
569
473
use tracing:: { debug, error, instrument, trace, warn} ;
570
474
571
475
use super :: {
572
476
super :: { deduplicator:: DeduplicationOutcome , EventCacheError } ,
573
477
events:: RoomEvents ,
574
478
sort_positions_descending, EventLocation , LoadMoreEventsBackwardsOutcome ,
575
479
} ;
576
- use crate :: event_cache:: { deduplicator:: filter_duplicate_events, RoomPaginationStatus } ;
480
+ use crate :: event_cache:: {
481
+ deduplicator:: filter_duplicate_events, RoomEventCacheUpdate , RoomPaginationStatus ,
482
+ } ;
577
483
578
484
/// State for a single room's event cache.
579
485
///
@@ -1396,6 +1302,117 @@ mod private {
1396
1302
1397
1303
Ok ( ( ) )
1398
1304
}
1305
+
1306
+ /// Handle the result of a sync.
1307
+ pub async fn handle_sync (
1308
+ & mut self ,
1309
+ mut timeline : Timeline ,
1310
+ sender : & Sender < RoomEventCacheUpdate > ,
1311
+ ) -> Result < bool , EventCacheError > {
1312
+ let mut prev_batch = timeline. prev_batch . take ( ) ;
1313
+
1314
+ let (
1315
+ DeduplicationOutcome {
1316
+ all_events : events,
1317
+ in_memory_duplicated_event_ids,
1318
+ in_store_duplicated_event_ids,
1319
+ } ,
1320
+ all_duplicates,
1321
+ ) = self . collect_valid_and_duplicated_events ( timeline. events ) . await ?;
1322
+
1323
+ // If the timeline isn't limited, and we already knew about some past events,
1324
+ // then this definitely knows what the timeline head is (either we know
1325
+ // about all the events persisted in storage, or we have a gap
1326
+ // somewhere). In this case, we can ditch the previous-batch
1327
+ // token, which is an optimization to avoid unnecessary future back-pagination
1328
+ // requests.
1329
+ //
1330
+ // We can also ditch it if we knew about all the events that came from sync,
1331
+ // namely, they were all deduplicated. In this case, using the
1332
+ // previous-batch token would only result in fetching other events we
1333
+ // knew about. This is slightly incorrect in the presence of
1334
+ // network splits, but this has shown to be Good Enough™.
1335
+ if !timeline. limited && self . events . events ( ) . next ( ) . is_some ( ) || all_duplicates {
1336
+ prev_batch = None ;
1337
+ }
1338
+
1339
+ // During a sync, when a duplicated event is found, the old event is removed and
1340
+ // the new event is added.
1341
+ //
1342
+ // Let's remove the old events that are duplicated.
1343
+ let timeline_event_diffs = if all_duplicates {
1344
+ // No new events, thus no need to change the room events.
1345
+ vec ! [ ]
1346
+ } else {
1347
+ // Remove the old duplicated events.
1348
+ //
1349
+ // We don't have to worry the removals can change the position of the
1350
+ // existing events, because we are pushing all _new_
1351
+ // `events` at the back.
1352
+ let mut timeline_event_diffs = self
1353
+ . remove_events ( in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1354
+ . await ?;
1355
+
1356
+ // Add the previous back-pagination token (if present), followed by the timeline
1357
+ // events themselves.
1358
+ let new_timeline_event_diffs = self
1359
+ . with_events_mut ( true , |room_events| {
1360
+ // If we only received duplicated events, we don't need to store the gap: if
1361
+ // there was a gap, we'd have received an unknown event at the tail of
1362
+ // the room's timeline (unless the server reordered sync events since the
1363
+ // last time we sync'd).
1364
+ if let Some ( prev_token) = & prev_batch {
1365
+ // As a tiny optimization: remove the last chunk if it's an empty event
1366
+ // one, as it's not useful to keep it before a gap.
1367
+ let prev_chunk_to_remove =
1368
+ room_events. rchunks ( ) . next ( ) . and_then ( |chunk| {
1369
+ ( chunk. is_items ( ) && chunk. num_items ( ) == 0 )
1370
+ . then_some ( chunk. identifier ( ) )
1371
+ } ) ;
1372
+
1373
+ room_events. push_gap ( Gap { prev_token : prev_token. clone ( ) } ) ;
1374
+
1375
+ if let Some ( prev_chunk_to_remove) = prev_chunk_to_remove {
1376
+ room_events. remove_empty_chunk_at ( prev_chunk_to_remove) . expect (
1377
+ "we just checked the chunk is there, and it's an empty item chunk" ,
1378
+ ) ;
1379
+ }
1380
+ }
1381
+
1382
+ room_events. push_events ( events. clone ( ) ) ;
1383
+
1384
+ events. clone ( )
1385
+ } )
1386
+ . await ?;
1387
+
1388
+ timeline_event_diffs. extend ( new_timeline_event_diffs) ;
1389
+
1390
+ if timeline. limited && prev_batch. is_some ( ) {
1391
+ // If there was a previous batch token for a limited timeline, unload the chunks
1392
+ // so it only contains the last one; otherwise, there might be a
1393
+ // valid gap in between, and observers may not render it (yet).
1394
+ //
1395
+ // We must do this *after* the above call to `.with_events_mut`, so the new
1396
+ // events and gaps are properly persisted to storage.
1397
+ if let Some ( diffs) = self . shrink_to_last_chunk ( ) . await ? {
1398
+ // Override the diffs with the new ones, as per `shrink_to_last_chunk`'s API
1399
+ // contract.
1400
+ timeline_event_diffs = diffs;
1401
+ }
1402
+ }
1403
+
1404
+ timeline_event_diffs
1405
+ } ;
1406
+
1407
+ if !timeline_event_diffs. is_empty ( ) {
1408
+ let _ = sender. send ( RoomEventCacheUpdate :: UpdateTimelineEvents {
1409
+ diffs : timeline_event_diffs,
1410
+ origin : crate :: event_cache:: EventsOrigin :: Sync ,
1411
+ } ) ;
1412
+ }
1413
+
1414
+ Ok ( prev_batch. is_some ( ) )
1415
+ }
1399
1416
}
1400
1417
}
1401
1418
0 commit comments