Skip to content

Commit a884b2c

Browse files
committed
refactor(event cache): move handling of a backpagination in RoomEventCacheState
1 parent ec0d7b4 commit a884b2c

File tree

2 files changed

+167
-164
lines changed

2 files changed

+167
-164
lines changed

crates/matrix-sdk/src/event_cache/pagination.rs

Lines changed: 5 additions & 161 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,14 @@
1717
use std::{sync::Arc, time::Duration};
1818

1919
use eyeball::{SharedObservable, Subscriber};
20-
use matrix_sdk_base::{
21-
deserialized_responses::TimelineEvent, linked_chunk::ChunkIdentifier, timeout::timeout,
22-
};
20+
use matrix_sdk_base::timeout::timeout;
2321
use matrix_sdk_common::linked_chunk::ChunkContent;
2422
use ruma::api::Direction;
25-
use tokio::sync::RwLockWriteGuard;
2623
use tracing::{debug, instrument, trace};
2724

2825
use super::{
29-
deduplicator::DeduplicationOutcome,
3026
room::{events::Gap, LoadMoreEventsBackwardsOutcome, RoomEventCacheInner},
31-
BackPaginationOutcome, EventsOrigin, Result, RoomEventCacheState, RoomEventCacheUpdate,
27+
BackPaginationOutcome, EventsOrigin, Result, RoomEventCacheUpdate,
3228
};
3329
use crate::{event_cache::EventCacheError, room::MessagesOptions};
3430

@@ -281,7 +277,7 @@ impl RoomPagination {
281277

282278
// Make sure the `RoomEvents` isn't updated while we are saving events from
283279
// backpagination.
284-
let state = self.inner.state.write().await;
280+
let mut state = self.inner.state.write().await;
285281

286282
// Check that the previous token still exists; otherwise it's a sign that the
287283
// room's timeline has been cleared.
@@ -305,164 +301,12 @@ impl RoomPagination {
305301
None
306302
};
307303

308-
self.handle_network_pagination_result(state, events, new_gap, prev_gap_chunk_id)
304+
state
305+
.handle_backpagination(events, new_gap, prev_gap_chunk_id, &self.inner.sender)
309306
.await
310307
.map(Some)
311308
}
312309

313-
/// Handle the result of a successful network back-pagination.
314-
async fn handle_network_pagination_result(
315-
&self,
316-
mut state: RwLockWriteGuard<'_, RoomEventCacheState>,
317-
events: Vec<TimelineEvent>,
318-
new_gap: Option<Gap>,
319-
prev_gap_id: Option<ChunkIdentifier>,
320-
) -> Result<BackPaginationOutcome> {
321-
// If there's no new previous gap, then we've reached the start of the timeline.
322-
let network_reached_start = new_gap.is_none();
323-
324-
let (
325-
DeduplicationOutcome {
326-
all_events: mut events,
327-
in_memory_duplicated_event_ids,
328-
in_store_duplicated_event_ids,
329-
},
330-
all_duplicates,
331-
) = state.collect_valid_and_duplicated_events(events).await?;
332-
333-
// If not all the events have been back-paginated, we need to remove the
334-
// previous ones, otherwise we can end up with misordered events.
335-
//
336-
// Consider the following scenario:
337-
// - sync returns [D, E, F]
338-
// - then sync returns [] with a previous batch token PB1, so the internal
339-
// linked chunk state is [D, E, F, PB1].
340-
// - back-paginating with PB1 may return [A, B, C, D, E, F].
341-
//
342-
// Only inserting the new events when replacing PB1 would result in a timeline
343-
// ordering of [D, E, F, A, B, C], which is incorrect. So we do have to remove
344-
// all the events, in case this happens (see also #4746).
345-
346-
let mut event_diffs = if !all_duplicates {
347-
// Let's forget all the previous events.
348-
state
349-
.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
350-
.await?
351-
} else {
352-
// All new events are duplicated, they can all be ignored.
353-
events.clear();
354-
Default::default()
355-
};
356-
357-
let next_diffs = state
358-
.with_events_mut(false, |room_events| {
359-
// Reverse the order of the events as `/messages` has been called with `dir=b`
360-
// (backwards). The `RoomEvents` API expects the first event to be the oldest.
361-
// Let's re-order them for this block.
362-
let reversed_events = events.iter().rev().cloned().collect::<Vec<_>>();
363-
364-
let first_event_pos = room_events.events().next().map(|(item_pos, _)| item_pos);
365-
366-
// First, insert events.
367-
let insert_new_gap_pos = if let Some(gap_id) = prev_gap_id {
368-
// There is a prior gap, let's replace it by new events!
369-
if all_duplicates {
370-
assert!(reversed_events.is_empty());
371-
}
372-
373-
trace!("replacing previous gap with the back-paginated events");
374-
375-
// Replace the gap with the events we just deduplicated. This might get rid of
376-
// the underlying gap, if the conditions are favorable to
377-
// us.
378-
room_events
379-
.replace_gap_at(reversed_events.clone(), gap_id)
380-
.expect("gap_identifier is a valid chunk id we read previously")
381-
} else if let Some(pos) = first_event_pos {
382-
// No prior gap, but we had some events: assume we need to prepend events
383-
// before those.
384-
trace!("inserted events before the first known event");
385-
386-
room_events
387-
.insert_events_at(reversed_events.clone(), pos)
388-
.expect("pos is a valid position we just read above");
389-
390-
Some(pos)
391-
} else {
392-
// No prior gap, and no prior events: push the events.
393-
trace!("pushing events received from back-pagination");
394-
395-
room_events.push_events(reversed_events.clone());
396-
397-
// A new gap may be inserted before the new events, if there are any.
398-
room_events.events().next().map(|(item_pos, _)| item_pos)
399-
};
400-
401-
// And insert the new gap if needs be.
402-
//
403-
// We only do this when at least one new, non-duplicated event, has been added
404-
// to the chunk. Otherwise it means we've back-paginated all the known events.
405-
if !all_duplicates {
406-
if let Some(new_gap) = new_gap {
407-
if let Some(new_pos) = insert_new_gap_pos {
408-
room_events
409-
.insert_gap_at(new_gap, new_pos)
410-
.expect("events_chunk_pos represents a valid chunk position");
411-
} else {
412-
room_events.push_gap(new_gap);
413-
}
414-
}
415-
} else {
416-
debug!(
417-
"not storing previous batch token, because we \
418-
deduplicated all new back-paginated events"
419-
);
420-
}
421-
422-
reversed_events
423-
})
424-
.await?;
425-
426-
event_diffs.extend(next_diffs);
427-
428-
// There could be an inconsistency between the network (which thinks we hit the
429-
// start of the timeline) and the disk (which has the initial empty
430-
// chunks), so tweak the `reached_start` value so that it reflects the disk
431-
// state in priority instead.
432-
let reached_start = {
433-
// There are no gaps.
434-
let has_gaps = state.events().chunks().any(|chunk| chunk.is_gap());
435-
436-
// The first chunk has no predecessors.
437-
let first_chunk_is_definitive_head =
438-
state.events().chunks().next().map(|chunk| chunk.is_definitive_head());
439-
440-
let reached_start =
441-
!has_gaps && first_chunk_is_definitive_head.unwrap_or(network_reached_start);
442-
443-
trace!(
444-
?network_reached_start,
445-
?has_gaps,
446-
?first_chunk_is_definitive_head,
447-
?reached_start,
448-
"finished handling network back-pagination"
449-
);
450-
451-
reached_start
452-
};
453-
454-
let backpagination_outcome = BackPaginationOutcome { events, reached_start };
455-
456-
if !event_diffs.is_empty() {
457-
let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
458-
diffs: event_diffs,
459-
origin: EventsOrigin::Pagination,
460-
});
461-
}
462-
463-
Ok(backpagination_outcome)
464-
}
465-
466310
/// Returns a subscriber to the pagination status used for the
467311
/// back-pagination integrated to the event cache.
468312
pub fn status(&self) -> Subscriber<RoomPaginationStatus> {

crates/matrix-sdk/src/event_cache/room/mod.rs

Lines changed: 162 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,8 @@ mod private {
455455
},
456456
event_cache::{store::EventCacheStoreLock, Event, Gap},
457457
linked_chunk::{
458-
lazy_loader, ChunkContent, ChunkIdentifierGenerator, LinkedChunkId, Position, Update,
458+
lazy_loader, ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, LinkedChunkId,
459+
Position, Update,
459460
},
460461
serde_helpers::extract_thread_root,
461462
sync::Timeline,
@@ -478,7 +479,8 @@ mod private {
478479
sort_positions_descending, EventLocation, LoadMoreEventsBackwardsOutcome,
479480
};
480481
use crate::event_cache::{
481-
deduplicator::filter_duplicate_events, RoomEventCacheUpdate, RoomPaginationStatus,
482+
deduplicator::filter_duplicate_events, BackPaginationOutcome, EventsOrigin,
483+
RoomEventCacheUpdate, RoomPaginationStatus,
482484
};
483485

484486
/// State for a single room's event cache.
@@ -1304,6 +1306,12 @@ mod private {
13041306
}
13051307

13061308
/// Handle the result of a sync.
1309+
///
1310+
/// It may send room event cache updates to the given sender, if it
1311+
/// generated any of those.
1312+
///
1313+
/// Returns true if a new gap (previous-batch token) has been inserted,
1314+
/// false otherwise.
13071315
pub async fn handle_sync(
13081316
&mut self,
13091317
mut timeline: Timeline,
@@ -1407,12 +1415,163 @@ mod private {
14071415
if !timeline_event_diffs.is_empty() {
14081416
let _ = sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
14091417
diffs: timeline_event_diffs,
1410-
origin: crate::event_cache::EventsOrigin::Sync,
1418+
origin: EventsOrigin::Sync,
14111419
});
14121420
}
14131421

14141422
Ok(prev_batch.is_some())
14151423
}
1424+
1425+
pub async fn handle_backpagination(
1426+
&mut self,
1427+
events: Vec<TimelineEvent>,
1428+
new_gap: Option<Gap>,
1429+
prev_gap_id: Option<ChunkIdentifier>,
1430+
sender: &Sender<RoomEventCacheUpdate>,
1431+
) -> Result<BackPaginationOutcome, EventCacheError> {
1432+
// If there's no new gap (previous batch token), then we've reached the start of
1433+
// the timeline.
1434+
let network_reached_start = new_gap.is_none();
1435+
1436+
let (
1437+
DeduplicationOutcome {
1438+
all_events: mut events,
1439+
in_memory_duplicated_event_ids,
1440+
in_store_duplicated_event_ids,
1441+
},
1442+
all_duplicates,
1443+
) = self.collect_valid_and_duplicated_events(events).await?;
1444+
1445+
// If not all the events have been back-paginated, we need to remove the
1446+
// previous ones, otherwise we can end up with misordered events.
1447+
//
1448+
// Consider the following scenario:
1449+
// - sync returns [D, E, F]
1450+
// - then sync returns [] with a previous batch token PB1, so the internal
1451+
// linked chunk state is [D, E, F, PB1].
1452+
// - back-paginating with PB1 may return [A, B, C, D, E, F].
1453+
//
1454+
// Only inserting the new events when replacing PB1 would result in a timeline
1455+
// ordering of [D, E, F, A, B, C], which is incorrect. So we do have to remove
1456+
// all the events, in case this happens (see also #4746).
1457+
1458+
let mut event_diffs = if !all_duplicates {
1459+
// Let's forget all the previous events.
1460+
self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1461+
.await?
1462+
} else {
1463+
// All new events are duplicated, they can all be ignored.
1464+
events.clear();
1465+
Default::default()
1466+
};
1467+
1468+
let next_diffs = self
1469+
.with_events_mut(false, |room_events| {
1470+
// Reverse the order of the events as `/messages` has been called with `dir=b`
1471+
// (backwards). The `RoomEvents` API expects the first event to be the oldest.
1472+
// Let's re-order them for this block.
1473+
let reversed_events = events
1474+
.iter()
1475+
.rev()
1476+
.cloned()
1477+
.collect::<Vec<_>>();
1478+
1479+
let first_event_pos = room_events.events().next().map(|(item_pos, _)| item_pos);
1480+
1481+
// First, insert events.
1482+
let insert_new_gap_pos = if let Some(gap_id) = prev_gap_id {
1483+
// There is a prior gap, let's replace it by new events!
1484+
if all_duplicates {
1485+
assert!(reversed_events.is_empty());
1486+
}
1487+
1488+
trace!("replacing previous gap with the back-paginated events");
1489+
1490+
// Replace the gap with the events we just deduplicated. This might get rid of the
1491+
// underlying gap, if the conditions are favorable to us.
1492+
room_events.replace_gap_at(reversed_events.clone(), gap_id)
1493+
.expect("gap_identifier is a valid chunk id we read previously")
1494+
} else if let Some(pos) = first_event_pos {
1495+
// No prior gap, but we had some events: assume we need to prepend events
1496+
// before those.
1497+
trace!("inserted events before the first known event");
1498+
1499+
room_events
1500+
.insert_events_at(reversed_events.clone(), pos)
1501+
.expect("pos is a valid position we just read above");
1502+
1503+
Some(pos)
1504+
} else {
1505+
// No prior gap, and no prior events: push the events.
1506+
trace!("pushing events received from back-pagination");
1507+
1508+
room_events.push_events(reversed_events.clone());
1509+
1510+
// A new gap may be inserted before the new events, if there are any.
1511+
room_events.events().next().map(|(item_pos, _)| item_pos)
1512+
};
1513+
1514+
// And insert the new gap if needs be.
1515+
//
1516+
// We only do this when at least one new, non-duplicated event, has been added to
1517+
// the chunk. Otherwise it means we've back-paginated all the known events.
1518+
if !all_duplicates {
1519+
if let Some(new_gap) = new_gap {
1520+
if let Some(new_pos) = insert_new_gap_pos {
1521+
room_events
1522+
.insert_gap_at(new_gap, new_pos)
1523+
.expect("events_chunk_pos represents a valid chunk position");
1524+
} else {
1525+
room_events.push_gap(new_gap);
1526+
}
1527+
}
1528+
} else {
1529+
debug!("not storing previous batch token, because we deduplicated all new back-paginated events");
1530+
}
1531+
1532+
reversed_events
1533+
})
1534+
.await?;
1535+
1536+
event_diffs.extend(next_diffs);
1537+
1538+
// There could be an inconsistency between the network (which thinks we hit the
1539+
// start of the timeline) and the disk (which has the initial empty
1540+
// chunks), so tweak the `reached_start` value so that it reflects the disk
1541+
// state in priority instead.
1542+
let reached_start = {
1543+
// There are no gaps.
1544+
let has_gaps = self.events.chunks().any(|chunk| chunk.is_gap());
1545+
1546+
// The first chunk has no predecessors.
1547+
let first_chunk_is_definitive_head =
1548+
self.events.chunks().next().map(|chunk| chunk.is_definitive_head());
1549+
1550+
let reached_start =
1551+
!has_gaps && first_chunk_is_definitive_head.unwrap_or(network_reached_start);
1552+
1553+
trace!(
1554+
?network_reached_start,
1555+
?has_gaps,
1556+
?first_chunk_is_definitive_head,
1557+
?reached_start,
1558+
"finished handling network back-pagination"
1559+
);
1560+
1561+
reached_start
1562+
};
1563+
1564+
let backpagination_outcome = BackPaginationOutcome { events, reached_start };
1565+
1566+
if !event_diffs.is_empty() {
1567+
let _ = sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
1568+
diffs: event_diffs,
1569+
origin: EventsOrigin::Pagination,
1570+
});
1571+
}
1572+
1573+
Ok(backpagination_outcome)
1574+
}
14161575
}
14171576
}
14181577

0 commit comments

Comments
 (0)