From 189832e62186066995e4021ca9bbfde1c0be6f84 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 15 Jul 2025 15:57:03 +0200 Subject: [PATCH] Revert "fix(sdk): Disable `OrderTracker` for the moment." This reverts commit c5f2460e02122eb4e3333e4acb97ae099d63e00f. --- .../matrix-sdk/src/event_cache/room/events.rs | 108 +++++- crates/matrix-sdk/src/event_cache/room/mod.rs | 343 +++++++++++++++++- .../tests/integration/event_cache/mod.rs | 164 ++++++++- 3 files changed, 585 insertions(+), 30 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/room/events.rs b/crates/matrix-sdk/src/event_cache/room/events.rs index b22195c1dfc..b683d433963 100644 --- a/crates/matrix-sdk/src/event_cache/room/events.rs +++ b/crates/matrix-sdk/src/event_cache/room/events.rs @@ -19,7 +19,7 @@ use matrix_sdk_base::{ event_cache::store::DEFAULT_CHUNK_CAPACITY, linked_chunk::{ lazy_loader::{self, LazyLoaderError}, - ChunkContent, ChunkIdentifierGenerator, RawChunk, + ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, OrderTracker, RawChunk, }, }; use matrix_sdk_common::linked_chunk::{ @@ -38,6 +38,9 @@ pub(in crate::event_cache) struct EventLinkedChunk { /// /// [`Update`]: matrix_sdk_base::linked_chunk::Update chunks_updates_as_vectordiffs: AsVector, + + /// Tracker of the events ordering in this room. + pub order_tracker: OrderTracker, } impl Default for EventLinkedChunk { @@ -49,7 +52,7 @@ impl Default for EventLinkedChunk { impl EventLinkedChunk { /// Build a new [`EventLinkedChunk`] struct with zero events. pub fn new() -> Self { - Self::with_initial_linked_chunk(None) + Self::with_initial_linked_chunk(None, None) } /// Build a new [`EventLinkedChunk`] struct with prior chunks knowledge. @@ -57,6 +60,7 @@ impl EventLinkedChunk { /// The provided [`LinkedChunk`] must have been built with update history. pub fn with_initial_linked_chunk( linked_chunk: Option>, + full_linked_chunk_metadata: Option>, ) -> Self { let mut linked_chunk = linked_chunk.unwrap_or_else(LinkedChunk::new_with_update_history); @@ -64,7 +68,11 @@ impl EventLinkedChunk { .as_vector() .expect("`LinkedChunk` must have been built with `new_with_update_history`"); - Self { chunks: linked_chunk, chunks_updates_as_vectordiffs } + let order_tracker = linked_chunk + .order_tracker(full_linked_chunk_metadata) + .expect("`LinkedChunk` must have been built with `new_with_update_history`"); + + Self { chunks: linked_chunk, chunks_updates_as_vectordiffs, order_tracker } } /// Clear all events. @@ -186,6 +194,36 @@ impl EventLinkedChunk { self.chunks.items() } + /// Return the order of an event in the room linked chunk. + /// + /// Can return `None` if the event can't be found in the linked chunk. + pub fn event_order(&self, event_pos: Position) -> Option { + self.order_tracker.ordering(event_pos) + } + + #[cfg(any(test, debug_assertions))] + #[allow(dead_code)] // Temporarily, until we figure out why it's crashing production builds. + fn assert_event_ordering(&self) { + let mut iter = self.chunks.items().enumerate(); + let Some((i, (first_event_pos, _))) = iter.next() else { + return; + }; + + // Sanity check. + assert_eq!(i, 0); + + // That's the offset in the full linked chunk. Will be 0 if the linked chunk is + // entirely loaded, may be non-zero otherwise. + let offset = + self.event_order(first_event_pos).expect("first event's ordering must be known"); + + for (i, (next_pos, _)) in iter { + let next_index = + self.event_order(next_pos).expect("next event's ordering must be known"); + assert_eq!(offset + i, next_index, "event ordering must be continuous"); + } + } + /// Get all updates from the room events as [`VectorDiff`]. /// /// Be careful that each `VectorDiff` is returned only once! @@ -194,7 +232,11 @@ impl EventLinkedChunk { /// /// [`Update`]: matrix_sdk_base::linked_chunk::Update pub fn updates_as_vector_diffs(&mut self) -> Vec> { - self.chunks_updates_as_vectordiffs.take() + let updates = self.chunks_updates_as_vectordiffs.take(); + + self.order_tracker.flush_updates(false); + + updates } /// Get a mutable reference to the [`LinkedChunk`] updates, aka @@ -214,7 +256,8 @@ impl EventLinkedChunk { let mut result = Vec::new(); for chunk in self.chunks.chunks() { - let content = chunk_debug_string(chunk.content()); + let content = + chunk_debug_string(chunk.identifier(), chunk.content(), &self.order_tracker); let lazy_previous = if let Some(cid) = chunk.lazy_previous() { format!(" (lazy previous = {})", cid.index()) } else { @@ -357,6 +400,28 @@ impl EventLinkedChunk { // Methods related to lazy-loading. impl EventLinkedChunk { + /// Inhibits all the linked chunk updates caused by the function `f` on the + /// ordering tracker. + /// + /// Updates to the linked chunk that happen because of lazy loading must not + /// be taken into account by the order tracker, otherwise the + /// fully-loaded state (tracked by the order tracker) wouldn't match + /// reality anymore. This provides a facility to help applying such + /// updates. + fn inhibit_updates_to_ordering_tracker R, R>(&mut self, f: F) -> R { + // Start by flushing previous pending updates to the chunk ordering, if any. + self.order_tracker.flush_updates(false); + + // Call the function. + let r = f(self); + + // Now, flush other pending updates which have been caused by the function, and + // ignore them. + self.order_tracker.flush_updates(true); + + r + } + /// Replace the events with the given last chunk of events and generator. /// /// Happens only during lazy loading. @@ -368,7 +433,11 @@ impl EventLinkedChunk { last_chunk: Option>, chunk_identifier_generator: ChunkIdentifierGenerator, ) -> Result<(), LazyLoaderError> { - lazy_loader::replace_with(&mut self.chunks, last_chunk, chunk_identifier_generator) + // Since `replace_with` is used only to unload some chunks, we don't want it to + // affect the chunk ordering. + self.inhibit_updates_to_ordering_tracker(move |this| { + lazy_loader::replace_with(&mut this.chunks, last_chunk, chunk_identifier_generator) + }) } /// Prepends a lazily-loaded chunk at the beginning of the linked chunk. @@ -376,12 +445,20 @@ impl EventLinkedChunk { &mut self, raw_new_first_chunk: RawChunk, ) -> Result<(), LazyLoaderError> { - lazy_loader::insert_new_first_chunk(&mut self.chunks, raw_new_first_chunk) + // This is only used when reinserting a chunk that was in persisted storage, so + // we don't need to touch the chunk ordering for this. + self.inhibit_updates_to_ordering_tracker(move |this| { + lazy_loader::insert_new_first_chunk(&mut this.chunks, raw_new_first_chunk) + }) } } /// Create a debug string for a [`ChunkContent`] for an event/gap pair. -fn chunk_debug_string(content: &ChunkContent) -> String { +fn chunk_debug_string( + chunk_id: ChunkIdentifier, + content: &ChunkContent, + order_tracker: &OrderTracker, +) -> String { match content { ChunkContent::Gap(Gap { prev_token }) => { format!("gap['{prev_token}']") @@ -389,12 +466,18 @@ fn chunk_debug_string(content: &ChunkContent) -> String { ChunkContent::Items(vec) => { let items = vec .iter() - .map(|event| { + .enumerate() + .map(|(i, event)| { event.event_id().map_or_else( || "".to_owned(), |id| { + let pos = Position::new(chunk_id, i); + let order = format!("#{}: ", order_tracker.ordering(pos).unwrap()); + // Limit event ids to 8 chars *after* the $. - id.as_str().chars().take(1 + 8).collect::() + let event_id = id.as_str().chars().take(1 + 8).collect::(); + + format!("{order}{event_id}") }, ) }) @@ -679,10 +762,13 @@ mod tests { ]); linked_chunk.chunks.push_gap_back(Gap { prev_token: "raclette".to_owned() }); + // Flush updates to the order tracker. + let _ = linked_chunk.updates_as_vector_diffs(); + let output = linked_chunk.debug_string(); assert_eq!(output.len(), 2); - assert_eq!(&output[0], "chunk #0: events[$12345678, $2]"); + assert_eq!(&output[0], "chunk #0: events[#0: $12345678, #1: $2]"); assert_eq!(&output[1], "chunk #1: gap['raclette']"); } diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index accca7056c8..1db6f3b8803 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -618,10 +618,13 @@ mod private { use matrix_sdk_base::{ apply_redaction, deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind}, - event_cache::{store::EventCacheStoreLock, Event, Gap}, + event_cache::{ + store::{DynEventCacheStore, EventCacheStoreLock}, + Event, Gap, + }, linked_chunk::{ lazy_loader::{self}, - ChunkContent, ChunkIdentifierGenerator, LinkedChunkId, Position, Update, + ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position, Update, }, serde_helpers::extract_thread_root, sync::Timeline, @@ -704,29 +707,42 @@ mod private { let linked_chunk_id = LinkedChunkId::Room(&room_id); - let linked_chunk = match store_lock + // Load the full linked chunk's metadata, so as to feed the order tracker. + // + // If loading the full linked chunk failed, we'll clear the event cache, as it + // indicates that at some point, there's some malformed data. + let full_linked_chunk_metadata = + match Self::load_linked_chunk_metadata(&*store_lock, linked_chunk_id).await { + Ok(metas) => metas, + Err(err) => { + error!( + "error when loading a linked chunk's metadata from the store: {err}" + ); + + // Clear storage for this room. + store_lock + .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]) + .await?; + + // Restart with an empty linked chunk. + None + } + }; + + let linked_chunk = store_lock .load_last_chunk(linked_chunk_id) .await .map_err(EventCacheError::from) .and_then(|(last_chunk, chunk_identifier_generator)| { lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator) .map_err(EventCacheError::from) - }) { - Ok(linked_chunk) => linked_chunk, - Err(err) => { - error!("error when loading the linked chunk from the store: {err}"); - - // Clear storage for this room. - store_lock - .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]) - .await?; - - // Restart with an empty linked chunk. - None - } - }; + }) + .expect("fully loading the linked chunk just worked, so loading it partially should also work"); - let room_linked_chunk = EventLinkedChunk::with_initial_linked_chunk(linked_chunk); + let room_linked_chunk = EventLinkedChunk::with_initial_linked_chunk( + linked_chunk, + full_linked_chunk_metadata, + ); // The threads mapping is intentionally empty at start, since we're going to // reload threads lazily, as soon as we need to (based on external @@ -746,6 +762,131 @@ mod private { }) } + /// Load a linked chunk's full metadata, making sure the chunks are + /// according to their their links. + /// + /// Returns `None` if there's no such linked chunk in the store, or an + /// error if the linked chunk is malformed. + async fn load_linked_chunk_metadata( + store: &DynEventCacheStore, + linked_chunk_id: LinkedChunkId<'_>, + ) -> Result>, EventCacheError> { + let mut all_chunks = store + .load_all_chunks_metadata(linked_chunk_id) + .await + .map_err(EventCacheError::from)?; + + if all_chunks.is_empty() { + // There are no chunks, so there's nothing to do. + return Ok(None); + } + + // Transform the vector into a hashmap, for quick lookup of the predecessors. + let chunk_map: HashMap<_, _> = + all_chunks.iter().map(|meta| (meta.identifier, meta)).collect(); + + // Find a last chunk. + let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none()); + let Some(last) = iter.next() else { + return Err(EventCacheError::InvalidLinkedChunkMetadata { + details: "no last chunk found".to_owned(), + }); + }; + + // There must at most one last chunk. + if let Some(other_last) = iter.next() { + return Err(EventCacheError::InvalidLinkedChunkMetadata { + details: format!( + "chunks {} and {} both claim to be last chunks", + last.identifier.index(), + other_last.identifier.index() + ), + }); + } + + // Rewind the chain back to the first chunk, and do some checks at the same + // time. + let mut seen = HashSet::new(); + let mut current = last; + loop { + // If we've already seen this chunk, there's a cycle somewhere. + if !seen.insert(current.identifier) { + return Err(EventCacheError::InvalidLinkedChunkMetadata { + details: format!( + "cycle detected in linked chunk at {}", + current.identifier.index() + ), + }); + } + + let Some(prev_id) = current.previous else { + // If there's no previous chunk, we're done. + if seen.len() != all_chunks.len() { + return Err(EventCacheError::InvalidLinkedChunkMetadata { + details: format!( + "linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected", + seen.len(), + all_chunks.len() + ), + }); + } + break; + }; + + // If the previous chunk is not in the map, then it's unknown + // and missing. + let Some(pred_meta) = chunk_map.get(&prev_id) else { + return Err(EventCacheError::InvalidLinkedChunkMetadata { + details: format!( + "missing predecessor {} chunk for {}", + prev_id.index(), + current.identifier.index() + ), + }); + }; + + // If the previous chunk isn't connected to the next, then the link is invalid. + if pred_meta.next != Some(current.identifier) { + return Err(EventCacheError::InvalidLinkedChunkMetadata { + details: format!( + "chunk {}'s next ({:?}) doesn't match the current chunk ({})", + pred_meta.identifier.index(), + pred_meta.next.map(|chunk_id| chunk_id.index()), + current.identifier.index() + ), + }); + } + + current = *pred_meta; + } + + // At this point, `current` is the identifier of the first chunk. + // + // Reorder the resulting vector, by going through the chain of `next` links, and + // swapping items into their final position. + // + // Invariant in this loop: all items in [0..i[ are in their final, correct + // position. + let mut current = current.identifier; + for i in 0..all_chunks.len() { + // Find the target metadata. + let j = all_chunks + .iter() + .rev() + .position(|meta| meta.identifier == current) + .map(|j| all_chunks.len() - 1 - j) + .expect("the target chunk must be present in the metadata"); + if i != j { + all_chunks.swap(i, j); + } + if let Some(next) = all_chunks[i].next { + current = next; + } + } + + Ok(Some(all_chunks)) + } + /// Given a fully-loaded linked chunk with no gaps, return the /// [`LoadMoreEventsBackwardsOutcome`] expected for this room's cache. fn conclude_load_more_for_fully_loaded_chunk(&mut self) -> LoadMoreEventsBackwardsOutcome { @@ -942,6 +1083,10 @@ mod private { Ok(self.room_linked_chunk.updates_as_vector_diffs()) } + pub(crate) fn room_event_order(&self, event_pos: Position) -> Option { + self.room_linked_chunk.event_order(event_pos) + } + /// Removes the bundled relations from an event, if they were present. /// /// Only replaces the present if it contained bundled relations. @@ -1046,6 +1191,7 @@ mod private { &mut self, updates: Vec>, ) -> Result<(), EventCacheError> { + self.room_linked_chunk.order_tracker.map_updates(&updates); self.send_updates_to_store(updates).await } @@ -1242,6 +1388,32 @@ mod private { trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events"); + // Sort the results by their positions in the linked chunk, if available. + // + // If an event doesn't have a known position, it goes to the start of the array. + related.sort_by(|(_, lhs), (_, rhs)| { + use std::cmp::Ordering; + match (lhs, rhs) { + (None, None) => Ordering::Equal, + (None, Some(_)) => Ordering::Less, + (Some(_), None) => Ordering::Greater, + (Some(lhs), Some(rhs)) => { + let lhs = self.room_event_order(*lhs); + let rhs = self.room_event_order(*rhs); + + // The events should have a definite position, but in the case they don't, + // still consider that not having a position means you'll end at the start + // of the array. + match (lhs, rhs) { + (None, None) => Ordering::Equal, + (None, Some(_)) => Ordering::Less, + (Some(_), None) => Ordering::Greater, + (Some(lhs), Some(rhs)) => lhs.cmp(&rhs), + } + } + } + }); + // Keep only the events, not their positions. let related = related.into_iter().map(|(event, _pos)| event).collect(); @@ -2805,6 +2977,141 @@ mod timed_tests { assert!(outcome.reached_start); } + #[async_test] + async fn test_room_ordering() { + let room_id = room_id!("!galette:saucisse.bzh"); + + let client = MockClientBuilder::new(None).build().await; + + let f = EventFactory::new().room(room_id).sender(*ALICE); + + let evid1 = event_id!("$1"); + let evid2 = event_id!("$2"); + let evid3 = event_id!("$3"); + + let ev1 = f.text_msg("hello world").event_id(evid1).into_event(); + let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event(); + let ev3 = f.text_msg("yo").event_id(evid3).into_event(); + + // Fill the event cache store with an initial linked chunk with 2 events chunks. + { + let store = client.event_cache_store(); + let store = store.lock().await.unwrap(); + store + .handle_linked_chunk_updates( + LinkedChunkId::Room(room_id), + vec![ + Update::NewItemsChunk { + previous: None, + new: ChunkIdentifier::new(0), + next: None, + }, + Update::PushItems { + at: Position::new(ChunkIdentifier::new(0), 0), + items: vec![ev1, ev2], + }, + Update::NewItemsChunk { + previous: Some(ChunkIdentifier::new(0)), + new: ChunkIdentifier::new(1), + next: None, + }, + Update::PushItems { + at: Position::new(ChunkIdentifier::new(1), 0), + items: vec![ev3.clone()], + }, + ], + ) + .await + .unwrap(); + } + + let event_cache = client.event_cache(); + event_cache.subscribe().unwrap(); + + client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); + let room = client.get_room(room_id).unwrap(); + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + + // Initially, the linked chunk only contains the last chunk, so only ev3 is + // loaded. + { + let state = room_event_cache.inner.state.read().await; + + // But we can get the order of ev1. + assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 0)), Some(0)); + + // And that of ev2 as well. + assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 1)), Some(1)); + + // ev3, which is loaded, also has a known ordering. + let mut events = state.room_linked_chunk().events(); + let (pos, ev) = events.next().unwrap(); + assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0)); + assert_eq!(ev.event_id().as_deref(), Some(evid3)); + assert_eq!(state.room_event_order(pos), Some(2)); + + // No other loaded events. + assert!(events.next().is_none()); + } + + // Force loading the full linked chunk by back-paginating. + let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap(); + assert!(outcome.reached_start); + + // All events are now loaded, so their order is precisely their enumerated index + // in a linear iteration. + { + let state = room_event_cache.inner.state.read().await; + for (i, (pos, _)) in state.room_linked_chunk().events().enumerate() { + assert_eq!(state.room_event_order(pos), Some(i)); + } + } + + // Handle a gappy sync with two events (including one duplicate, so + // deduplication kicks in), so that the linked chunk is shrunk to the + // last chunk, and that the linked chunk only contains the last two + // events. + let evid4 = event_id!("$4"); + room_event_cache + .inner + .handle_joined_room_update(JoinedRoomUpdate { + timeline: Timeline { + limited: true, + prev_batch: Some("fondue".to_owned()), + events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()], + }, + ..Default::default() + }) + .await + .unwrap(); + + { + let state = room_event_cache.inner.state.read().await; + + // After the shrink, only evid3 and evid4 are loaded. + let mut events = state.room_linked_chunk().events(); + + let (pos, ev) = events.next().unwrap(); + assert_eq!(ev.event_id().as_deref(), Some(evid3)); + assert_eq!(state.room_event_order(pos), Some(2)); + + let (pos, ev) = events.next().unwrap(); + assert_eq!(ev.event_id().as_deref(), Some(evid4)); + assert_eq!(state.room_event_order(pos), Some(3)); + + // No other loaded events. + assert!(events.next().is_none()); + + // But we can still get the order of previous events. + assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 0)), Some(0)); + assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 1)), Some(1)); + + // ev3 doesn't have an order with its previous position, since it's been + // deduplicated. + assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(1), 0)), None); + } + } + #[async_test] async fn test_auto_shrink_after_all_subscribers_are_gone() { let room_id = room_id!("!galette:saucisse.bzh"); diff --git a/crates/matrix-sdk/tests/integration/event_cache/mod.rs b/crates/matrix-sdk/tests/integration/event_cache/mod.rs index 5e6bbc92593..48e84006bda 100644 --- a/crates/matrix-sdk/tests/integration/event_cache/mod.rs +++ b/crates/matrix-sdk/tests/integration/event_cache/mod.rs @@ -28,7 +28,10 @@ use matrix_sdk_test::{ }; use ruma::{ event_id, - events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent, TimelineEventType}, + events::{ + room::message::RoomMessageEventContentWithoutRelation, AnySyncMessageLikeEvent, + AnySyncTimelineEvent, TimelineEventType, + }, room_id, user_id, EventId, RoomVersionId, }; use serde_json::json; @@ -2522,3 +2525,162 @@ async fn test_sync_while_back_paginate() { assert!(subscriber.is_empty()); } + +#[async_test] +async fn test_relations_ordering() { + let server = MatrixMockServer::new().await; + + let room_id = room_id!("!galette:saucisse.bzh"); + let f = EventFactory::new().room(room_id).sender(*ALICE); + + let target_event_id = event_id!("$1"); + + // Start with a prefilled event cache store that includes the target event. + let ev1 = f.text_msg("bonjour monde").event_id(target_event_id).into_event(); + + let event_cache_store = Arc::new(MemoryStore::new()); + event_cache_store + .handle_linked_chunk_updates( + LinkedChunkId::Room(room_id), + vec![ + // An empty items chunk. + Update::NewItemsChunk { previous: None, new: ChunkIdentifier::new(0), next: None }, + Update::PushItems { + at: Position::new(ChunkIdentifier::new(0), 0), + items: vec![ev1.clone()], + }, + ], + ) + .await + .unwrap(); + + let client = server + .client_builder() + .store_config( + StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()), + ) + .build() + .await; + + let event_cache = client.event_cache(); + event_cache.subscribe().unwrap(); + + let room = server.sync_joined_room(&client, room_id).await; + + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + + let (initial_events, mut listener) = room_event_cache.subscribe().await; + assert_eq!(initial_events.len(), 1); + assert!(listener.recv().now_or_never().is_none()); + + // Sanity check: there are no relations for the target event yet. + let (_, relations) = + room_event_cache.find_event_with_relations(target_event_id, None).await.unwrap(); + assert!(relations.is_empty()); + + let edit2 = event_id!("$edit2"); + let ev2 = f + .text_msg("* hola mundo") + .edit(target_event_id, RoomMessageEventContentWithoutRelation::text_plain("hola mundo")) + .event_id(edit2) + .into_raw(); + + let edit3 = event_id!("$edit3"); + let ev3 = f + .text_msg("* ciao mondo") + .edit(target_event_id, RoomMessageEventContentWithoutRelation::text_plain("ciao mondo")) + .event_id(edit3) + .into_raw(); + + let edit4 = event_id!("$edit4"); + let ev4 = f + .text_msg("* hello world") + .edit(target_event_id, RoomMessageEventContentWithoutRelation::text_plain("hello world")) + .event_id(edit4) + .into_raw(); + + // We receive two edit events via sync, as well as a gap; this will shrink the + // linked chunk. + server + .sync_room( + &client, + JoinedRoomBuilder::new(room_id) + .add_timeline_event(ev3.clone()) + .add_timeline_event(ev4.clone()) + .set_timeline_limited() + .set_timeline_prev_batch("prev_batch"), + ) + .await; + + // Wait for the listener to tell us we've received something. + loop { + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = listener.recv() + ); + // We've received the shrink. + if diffs.iter().any(|diff| matches!(diff, VectorDiff::Clear)) { + break; + } + } + + // At this point, relations are known for the target event. + let (_, relations) = + room_event_cache.find_event_with_relations(target_event_id, None).await.unwrap(); + assert_eq!(relations.len(), 2); + // And the edit events are correctly ordered according to their position in the + // linked chunk. + assert_eq!(relations[0].event_id().unwrap(), edit3); + assert_eq!(relations[1].event_id().unwrap(), edit4); + + // Now, we resolve the gap; this returns ev2, another edit. + server + .mock_room_messages() + .match_from("prev_batch") + .ok(RoomMessagesResponseTemplate::default().events(vec![ev2.clone()])) + .named("room/messages") + .mock_once() + .mount() + .await; + + // Run the pagination. + let outcome = room_event_cache.pagination().run_backwards_once(1).await.unwrap(); + assert!(outcome.reached_start.not()); + assert_eq!(outcome.events.len(), 1); + + { + // Sanity check: we load the first chunk with the first event, from disk, and + // reach the start of the timeline. + let outcome = room_event_cache.pagination().run_backwards_once(1).await.unwrap(); + assert!(outcome.reached_start); + } + + // Relations are returned accordingly. + let (_, relations) = + room_event_cache.find_event_with_relations(target_event_id, None).await.unwrap(); + assert_eq!(relations.len(), 3); + assert_eq!(relations[0].event_id().unwrap(), edit2); + assert_eq!(relations[1].event_id().unwrap(), edit3); + assert_eq!(relations[2].event_id().unwrap(), edit4); + + // If I save an additional event without storing it in the linked chunk, it will + // be present at the start of the relations list. + let edit5 = event_id!("$edit5"); + let ev5 = f + .text_msg("* hallo Welt") + .edit(target_event_id, RoomMessageEventContentWithoutRelation::text_plain("hallo Welt")) + .event_id(edit5) + .into_event(); + + server.mock_room_event().ok(ev5).mock_once().mount().await; + + // This saves the event, but without a position. + room.event(edit5, None).await.unwrap(); + + let (_, relations) = + room_event_cache.find_event_with_relations(target_event_id, None).await.unwrap(); + assert_eq!(relations.len(), 4); + assert_eq!(relations[0].event_id().unwrap(), edit5); + assert_eq!(relations[1].event_id().unwrap(), edit2); + assert_eq!(relations[2].event_id().unwrap(), edit3); + assert_eq!(relations[3].event_id().unwrap(), edit4); +}