Skip to content

Commit 6a1576a

Browse files
committed
refactor(timeline): make use of the threaded event cache in the timeline
1 parent a1f028c commit 6a1576a

File tree

10 files changed

+179
-147
lines changed

10 files changed

+179
-147
lines changed

bindings/matrix-sdk-ffi/src/timeline/configuration.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ pub enum TimelineFocus {
7979
Thread {
8080
/// The thread root event ID to focus on.
8181
root_event_id: String,
82-
num_events: u16,
8382
},
8483
PinnedEvents {
8584
max_events_to_load: u16,
@@ -108,15 +107,15 @@ impl TryFrom<TimelineFocus> for matrix_sdk_ui::timeline::TimelineFocus {
108107
hide_threaded_events,
109108
})
110109
}
111-
TimelineFocus::Thread { root_event_id, num_events } => {
110+
TimelineFocus::Thread { root_event_id } => {
112111
let parsed_root_event_id = EventId::parse(&root_event_id).map_err(|err| {
113112
FocusEventError::InvalidEventId {
114113
event_id: root_event_id.clone(),
115114
err: err.to_string(),
116115
}
117116
})?;
118117

119-
Ok(Self::Thread { root_event_id: parsed_root_event_id, num_events })
118+
Ok(Self::Thread { root_event_id: parsed_root_event_id })
120119
}
121120
TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => {
122121
Ok(Self::PinnedEvents { max_events_to_load, max_concurrent_requests })

crates/matrix-sdk-ui/src/timeline/builder.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ use super::{
2626
use crate::{
2727
timeline::{
2828
controller::spawn_crypto_tasks,
29-
tasks::{pinned_events_task, room_event_cache_updates_task, room_send_queue_update_task},
29+
tasks::{
30+
pinned_events_task, room_event_cache_updates_task, room_send_queue_update_task,
31+
thread_updates_task,
32+
},
3033
},
3134
unable_to_decrypt_hook::UtdHookManager,
3235
};
@@ -203,6 +206,35 @@ impl TimelineBuilder {
203206
.instrument(span)
204207
});
205208

209+
let thread_update_join_handle = if let Some(root) = controller.thread_root() {
210+
Some({
211+
let span = info_span!(
212+
parent: Span::none(),
213+
"thread_live_update_handler",
214+
room_id = ?room.room_id(),
215+
focus = focus.debug_string(),
216+
prefix = internal_id_prefix
217+
);
218+
span.follows_from(Span::current());
219+
220+
// Note: must be done here *before* spawning the task, to avoid race conditions
221+
// with event cache updates happening in the background.
222+
let (_events, receiver) = room_event_cache.subscribe_to_thread(root.clone()).await;
223+
224+
spawn(
225+
thread_updates_task(
226+
receiver,
227+
room_event_cache.clone(),
228+
controller.clone(),
229+
root,
230+
)
231+
.instrument(span),
232+
)
233+
})
234+
} else {
235+
None
236+
};
237+
206238
let local_echo_listener_handle = {
207239
let timeline_controller = controller.clone();
208240
let (local_echoes, send_queue_stream) = room.send_queue().subscribe().await?;
@@ -234,6 +266,7 @@ impl TimelineBuilder {
234266
drop_handle: Arc::new(TimelineDropHandle {
235267
_crypto_drop_handles: crypto_drop_handles,
236268
room_update_join_handle,
269+
thread_update_join_handle,
237270
pinned_events_join_handle,
238271
local_echo_listener_handle,
239272
_event_cache_drop_handle: event_cache_drop,

crates/matrix-sdk-ui/src/timeline/controller/mod.rs

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use matrix_sdk::{crypto::OlmMachine, SendOutsideWasm};
2525
use matrix_sdk::{
2626
deserialized_responses::TimelineEvent,
2727
event_cache::{RoomEventCache, RoomPaginationStatus},
28-
paginators::{thread::ThreadedEventsLoader, PaginationResult, Paginator},
28+
paginators::{PaginationResult, Paginator},
2929
send_queue::{
3030
LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
3131
},
@@ -115,9 +115,8 @@ pub(in crate::timeline) enum TimelineFocusKind<P: RoomDataProvider> {
115115
hide_threaded_events: bool,
116116
},
117117

118+
/// A live timeline for a thread.
118119
Thread {
119-
loader: ThreadedEventsLoader<P>,
120-
121120
/// The root event for the current thread.
122121
root_event_id: OwnedEventId,
123122
},
@@ -285,13 +284,9 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
285284
TimelineFocusKind::Event { paginator, hide_threaded_events }
286285
}
287286

288-
TimelineFocus::Thread { root_event_id, .. } => TimelineFocusKind::Thread {
289-
loader: ThreadedEventsLoader::new(
290-
room_data_provider.clone(),
291-
root_event_id.clone(),
292-
),
293-
root_event_id,
294-
},
287+
TimelineFocus::Thread { root_event_id, .. } => {
288+
TimelineFocusKind::Thread { root_event_id }
289+
}
295290

296291
TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => {
297292
TimelineFocusKind::PinnedEvents {
@@ -381,25 +376,17 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
381376
Ok(has_events)
382377
}
383378

384-
TimelineFocus::Thread { num_events, .. } => {
385-
let TimelineFocusKind::Thread { loader, .. } = &*self.focus else {
386-
// Note: this is sync'd with code in the ctor.
387-
unreachable!();
388-
};
389-
390-
let result = loader
391-
.paginate_backwards((*num_events).into())
392-
.await
393-
.map_err(PaginationError::Paginator)?;
379+
TimelineFocus::Thread { root_event_id, .. } => {
380+
let (events, _) = room_event_cache.subscribe_to_thread(root_event_id.clone()).await;
381+
let has_events = !events.is_empty();
394382

395-
// Events are in reverse topological order.
396383
self.replace_with_initial_remote_events(
397-
result.events.into_iter().rev(),
398-
RemoteEventOrigin::Pagination,
384+
events.into_iter(),
385+
RemoteEventOrigin::Cache,
399386
)
400387
.await;
401388

402-
Ok(true)
389+
Ok(has_events)
403390
}
404391

405392
TimelineFocus::PinnedEvents { .. } => {
@@ -501,17 +488,15 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
501488
num_events: u16,
502489
) -> Result<bool, PaginationError> {
503490
let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
504-
TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents { .. } => {
491+
TimelineFocusKind::Live { .. }
492+
| TimelineFocusKind::PinnedEvents { .. }
493+
| TimelineFocusKind::Thread { .. } => {
505494
return Err(PaginationError::NotSupported);
506495
}
507496
TimelineFocusKind::Event { paginator, .. } => paginator
508497
.paginate_backward(num_events.into())
509498
.await
510499
.map_err(PaginationError::Paginator)?,
511-
TimelineFocusKind::Thread { loader, .. } => loader
512-
.paginate_backwards(num_events.into())
513-
.await
514-
.map_err(PaginationError::Paginator)?,
515500
};
516501

517502
// Events are in reverse topological order.
@@ -560,6 +545,10 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
560545
matches!(&*self.focus, TimelineFocusKind::Live { .. })
561546
}
562547

548+
pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
549+
as_variant!(&*self.focus, TimelineFocusKind::Thread { root_event_id } => root_event_id.clone())
550+
}
551+
563552
/// Get a copy of the current items in the list.
564553
///
565554
/// Cheap because `im::Vector` is cheap to clone.

crates/matrix-sdk-ui/src/timeline/mod.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,7 @@ pub enum TimelineFocus {
146146
},
147147

148148
/// Focus on a specific thread
149-
Thread {
150-
root_event_id: OwnedEventId,
151-
/// Number of initial events to load on the first /relations request.
152-
num_events: u16,
153-
},
149+
Thread { root_event_id: OwnedEventId },
154150

155151
/// Only show pinned events.
156152
PinnedEvents { max_events_to_load: u16, max_concurrent_requests: u16 },
@@ -777,6 +773,7 @@ impl Timeline {
777773
struct TimelineDropHandle {
778774
room_update_join_handle: JoinHandle<()>,
779775
pinned_events_join_handle: Option<JoinHandle<()>>,
776+
thread_update_join_handle: Option<JoinHandle<()>>,
780777
local_echo_listener_handle: JoinHandle<()>,
781778
_event_cache_drop_handle: Arc<EventCacheDropHandles>,
782779
_crypto_drop_handles: CryptoDropHandles,
@@ -785,7 +782,11 @@ struct TimelineDropHandle {
785782
impl Drop for TimelineDropHandle {
786783
fn drop(&mut self) {
787784
if let Some(handle) = self.pinned_events_join_handle.take() {
788-
handle.abort()
785+
handle.abort();
786+
}
787+
788+
if let Some(handle) = self.thread_update_join_handle.take() {
789+
handle.abort();
789790
}
790791

791792
self.local_echo_listener_handle.abort();

crates/matrix-sdk-ui/src/timeline/pagination.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ impl super::Timeline {
4545
}
4646

4747
Ok(self.live_paginate_backwards(num_events).await?)
48+
} else if let Some(thread_root) = self.controller.thread_root() {
49+
Ok(self.event_cache.paginate_thread_backwards(thread_root, num_events).await?)
4850
} else {
4951
Ok(self.controller.focused_paginate_backwards(num_events).await?)
5052
}

crates/matrix-sdk-ui/src/timeline/tasks.rs

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ use std::collections::BTreeSet;
1919
use futures_core::Stream;
2020
use futures_util::pin_mut;
2121
use matrix_sdk::{
22-
event_cache::{EventsOrigin, RoomEventCache, RoomEventCacheSubscriber, RoomEventCacheUpdate},
22+
event_cache::{
23+
EventsOrigin, RoomEventCache, RoomEventCacheSubscriber, RoomEventCacheUpdate,
24+
ThreadEventCacheUpdate,
25+
},
2326
send_queue::RoomSendQueueUpdate,
2427
};
2528
use ruma::OwnedEventId;
@@ -71,6 +74,61 @@ pub(in crate::timeline) async fn pinned_events_task<S>(
7174
}
7275
}
7376

77+
/// For a thread-focused timeline, a long-lived task that will listen to the
78+
/// underlying thread updates.
79+
pub(in crate::timeline) async fn thread_updates_task(
80+
mut receiver: Receiver<ThreadEventCacheUpdate>,
81+
room_event_cache: RoomEventCache,
82+
timeline_controller: TimelineController,
83+
root: OwnedEventId,
84+
) {
85+
trace!("Spawned the thread event subscriber task.");
86+
87+
loop {
88+
trace!("Waiting for an event.");
89+
90+
let update = match receiver.recv().await {
91+
Ok(up) => up,
92+
Err(RecvError::Closed) => break,
93+
Err(RecvError::Lagged(num_skipped)) => {
94+
warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");
95+
96+
// The updates might have lagged, but the room event cache might
97+
// have events, so retrieve them and add them back again to the
98+
// timeline, after clearing it.
99+
let (initial_events, _) = room_event_cache.subscribe_to_thread(root.clone()).await;
100+
101+
timeline_controller
102+
.replace_with_initial_remote_events(
103+
initial_events.into_iter(),
104+
RemoteEventOrigin::Cache,
105+
)
106+
.await;
107+
108+
continue;
109+
}
110+
};
111+
112+
trace!("Received new timeline events diffs");
113+
114+
let origin = match update.origin {
115+
EventsOrigin::Sync => RemoteEventOrigin::Sync,
116+
EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
117+
EventsOrigin::Cache => RemoteEventOrigin::Cache,
118+
};
119+
120+
let has_diffs = !update.diffs.is_empty();
121+
122+
timeline_controller.handle_remote_events_with_diffs(update.diffs, origin).await;
123+
124+
if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
125+
timeline_controller.retry_event_decryption(None).await;
126+
}
127+
}
128+
129+
trace!("Thread event subscriber task finished.");
130+
}
131+
74132
/// Long-lived task that forwards the [`RoomEventCacheUpdate`]s (remote echoes)
75133
/// to the timeline.
76134
pub(in crate::timeline) async fn room_event_cache_updates_task(
@@ -122,10 +180,7 @@ pub(in crate::timeline) async fn room_event_cache_updates_task(
122180

123181
let has_diffs = !diffs.is_empty();
124182

125-
if matches!(
126-
timeline_focus,
127-
TimelineFocus::Live { .. } | TimelineFocus::Thread { .. }
128-
) {
183+
if matches!(timeline_focus, TimelineFocus::Live { .. }) {
129184
timeline_controller.handle_remote_events_with_diffs(diffs, origin).await;
130185
} else {
131186
// Only handle the remote aggregation for a non-live timeline.

crates/matrix-sdk-ui/tests/integration/timeline/read_receipts.rs

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@ use eyeball_im::VectorDiff;
2020
use futures_util::StreamExt;
2121
use matrix_sdk::{
2222
room::Receipts,
23-
test_utils::mocks::{
24-
MatrixMockServer, RoomMessagesResponseTemplate, RoomRelationsResponseTemplate,
25-
},
23+
test_utils::mocks::{MatrixMockServer, RoomMessagesResponseTemplate},
2624
};
2725
use matrix_sdk_test::{
2826
async_test, event_factory::EventFactory, JoinedRoomBuilder, RoomAccountDataTestEvent, ALICE,
@@ -744,32 +742,9 @@ async fn test_send_single_receipt_threaded() {
744742
// body `thread_id` parameter value or no value at all for fully read
745743
// markers.
746744

747-
server
748-
.mock_room_event()
749-
.match_event_id()
750-
.ok(EventFactory::new()
751-
.text_msg("Thread root")
752-
.sender(user_id!("@alice:b.c"))
753-
.event_id(thread_root_event_id)
754-
.into())
755-
.mock_once()
756-
.mount()
757-
.await;
758-
759-
server
760-
.mock_room_relations()
761-
.match_target_event(thread_root_event_id.to_owned())
762-
.ok(RoomRelationsResponseTemplate::default())
763-
.mock_once()
764-
.mount()
765-
.await;
766-
767745
let timeline = room
768746
.timeline_builder()
769-
.with_focus(TimelineFocus::Thread {
770-
root_event_id: thread_root_event_id.to_owned(),
771-
num_events: 1,
772-
})
747+
.with_focus(TimelineFocus::Thread { root_event_id: thread_root_event_id.to_owned() })
773748
.build()
774749
.await
775750
.unwrap();
@@ -799,7 +774,7 @@ async fn test_send_single_receipt_threaded() {
799774
.await,
800775
server
801776
.mock_send_receipt(CreateReceiptType::FullyRead)
802-
.body_matches_partial_json(json!({}))
777+
.body_json(json!({}))
803778
.ok()
804779
.expect(1)
805780
.named("Fully-read marker")

0 commit comments

Comments
 (0)