From b609637caa60c3b98f77938b953ed5b24352bba4 Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Mon, 7 Jul 2025 19:58:22 +0200 Subject: [PATCH 01/30] feat(send_queue): add global setting for sending media progress updates Signed-off-by: Johannes Marbach --- bindings/matrix-sdk-ffi/src/client_builder.rs | 19 ++++++++++++++++++- crates/matrix-sdk/src/client/builder/mod.rs | 17 ++++++++++++++++- crates/matrix-sdk/src/send_queue/mod.rs | 11 ++++++++++- 3 files changed, 44 insertions(+), 3 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/client_builder.rs b/bindings/matrix-sdk-ffi/src/client_builder.rs index 2d814340a56..a143f4d4fcf 100644 --- a/bindings/matrix-sdk-ffi/src/client_builder.rs +++ b/bindings/matrix-sdk-ffi/src/client_builder.rs @@ -129,6 +129,7 @@ pub struct ClientBuilder { decryption_settings: DecryptionSettings, enable_share_history_on_invite: bool, request_config: Option, + enable_send_queue_media_upload_progress_reporting: bool, #[cfg(not(target_family = "wasm"))] user_agent: Option, @@ -180,6 +181,7 @@ impl ClientBuilder { enable_share_history_on_invite: false, request_config: Default::default(), threads_enabled: false, + enable_send_queue_media_upload_progress_reporting: false, }) } @@ -386,6 +388,18 @@ impl ClientBuilder { Arc::new(builder) } + /// Set whether to enable progress reporting for media uploads in the send + /// queue. + pub fn enable_send_queue_media_upload_progress_reporting( + self: Arc, + enable_send_queue_media_upload_progress_reporting: bool, + ) -> Arc { + let mut builder = unwrap_or_clone_arc(self); + builder.enable_send_queue_media_upload_progress_reporting = + enable_send_queue_media_upload_progress_reporting; + Arc::new(builder) + } + /// Add a default request config to this client. pub fn request_config(self: Arc, config: RequestConfig) -> Arc { let mut builder = unwrap_or_clone_arc(self); @@ -524,7 +538,10 @@ impl ClientBuilder { .with_encryption_settings(builder.encryption_settings) .with_room_key_recipient_strategy(builder.room_key_recipient_strategy) .with_decryption_settings(builder.decryption_settings) - .with_enable_share_history_on_invite(builder.enable_share_history_on_invite); + .with_enable_share_history_on_invite(builder.enable_share_history_on_invite) + .with_enable_send_queue_media_upload_progress_reporting( + builder.enable_send_queue_media_upload_progress_reporting, + ); match builder.sliding_sync_version_builder { SlidingSyncVersionBuilder::None => { diff --git a/crates/matrix-sdk/src/client/builder/mod.rs b/crates/matrix-sdk/src/client/builder/mod.rs index bbaa84a95f1..250a3cc3480 100644 --- a/crates/matrix-sdk/src/client/builder/mod.rs +++ b/crates/matrix-sdk/src/client/builder/mod.rs @@ -114,6 +114,7 @@ pub struct ClientBuilder { enable_share_history_on_invite: bool, cross_process_store_locks_holder_name: String, threading_support: ThreadingSupport, + enable_send_queue_media_upload_progress_reporting: bool, } impl ClientBuilder { @@ -145,6 +146,7 @@ impl ClientBuilder { cross_process_store_locks_holder_name: Self::DEFAULT_CROSS_PROCESS_STORE_LOCKS_HOLDER_NAME.to_owned(), threading_support: ThreadingSupport::Disabled, + enable_send_queue_media_upload_progress_reporting: false, } } @@ -489,6 +491,16 @@ impl ClientBuilder { self } + /// Set whether to report media upload progress via the send queue. + pub fn with_enable_send_queue_media_upload_progress_reporting( + mut self, + enable_send_queue_media_upload_progress_reporting: bool, + ) -> Self { + self.enable_send_queue_media_upload_progress_reporting = + enable_send_queue_media_upload_progress_reporting; + self + } + /// Create a [`Client`] with the options set on this builder. /// /// # Errors @@ -573,7 +585,10 @@ impl ClientBuilder { }); // Enable the send queue by default. - let send_queue = Arc::new(SendQueueData::new(true)); + let send_queue = Arc::new(SendQueueData::new( + true, + self.enable_send_queue_media_upload_progress_reporting, + )); let server_info = ClientServerInfo { server_versions: match self.server_versions { diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index 5f4ac0e8fc7..412f644acf6 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -246,6 +246,7 @@ impl SendQueue { data.is_dropping.clone(), &self.client, owned_room_id.clone(), + data.report_media_upload_progress.clone(), ); map.insert(owned_room_id, room_q.clone()); @@ -330,11 +331,14 @@ pub(super) struct SendQueueData { /// Are we currently dropping the Client? is_dropping: Arc, + + /// Will media upload progress be reported via send queue updates? + report_media_upload_progress: Arc, } impl SendQueueData { /// Create the data for a send queue, in the given enabled state. - pub fn new(globally_enabled: bool) -> Self { + pub fn new(globally_enabled: bool, report_media_upload_progress: bool) -> Self { let (sender, _) = broadcast::channel(32); Self { @@ -342,6 +346,7 @@ impl SendQueueData { globally_enabled: AtomicBool::new(globally_enabled), error_reporter: sender, is_dropping: Arc::new(false.into()), + report_media_upload_progress: Arc::new(AtomicBool::new(report_media_upload_progress)), } } } @@ -389,6 +394,7 @@ impl RoomSendQueue { is_dropping: Arc, client: &Client, room_id: OwnedRoomId, + report_media_upload_progress: Arc, ) -> Self { let (updates_sender, _) = broadcast::channel(32); @@ -406,6 +412,7 @@ impl RoomSendQueue { locally_enabled.clone(), global_error_reporter, is_dropping, + report_media_upload_progress, )); Self { @@ -514,6 +521,7 @@ impl RoomSendQueue { /// /// It only progresses forward: nothing can be cancelled at any point, which /// makes the implementation not overly complicated to follow. + #[allow(clippy::too_many_arguments)] #[instrument(skip_all, fields(room_id = %room.room_id()))] async fn sending_task( room: WeakRoom, @@ -523,6 +531,7 @@ impl RoomSendQueue { locally_enabled: Arc, global_error_reporter: broadcast::Sender, is_dropping: Arc, + _report_media_upload_progress: Arc, ) { trace!("spawned the sending task"); From 98ded6df049e5f0c5af75e7988fb993cd204fb3c Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Mon, 7 Jul 2025 19:58:28 +0200 Subject: [PATCH 02/30] chore(send_queue): collect thumbnail-related metadata in a dedicated QueueThumbnailInfo struct for easier extension Signed-off-by: Johannes Marbach --- crates/matrix-sdk/src/send_queue/mod.rs | 83 ++++++++++++++-------- crates/matrix-sdk/src/send_queue/upload.rs | 16 +++-- 2 files changed, 63 insertions(+), 36 deletions(-) diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index 412f644acf6..915dc9cbd2d 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -291,6 +291,20 @@ impl SendQueue { } } +/// Metadata about a thumbnail needed when pushing media uploads to the send +/// queue. +#[derive(Clone, Debug)] +struct QueueThumbnailInfo { + /// Metadata about the thumbnail needed when finishing a media upload. + finish_upload_thumbnail_info: FinishUploadThumbnailInfo, + + /// The parameters for the request to retrieve the thumbnail data. + media_request_parameters: MediaRequestParameters, + + /// The thumbnail's mime type. + content_type: Mime, +} + /// A specific room's send queue ran into an error, and it has disabled itself. #[derive(Clone, Debug)] pub struct SendQueueRoomError { @@ -1214,7 +1228,7 @@ impl QueueStorage { created_at: MilliSecondsSinceUnixEpoch, upload_file_txn: OwnedTransactionId, file_media_request: MediaRequestParameters, - thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>, + thumbnail: Option, ) -> Result<(), RoomSendQueueStorageError> { let guard = self.store.lock().await; let client = guard.client()?; @@ -1300,35 +1314,38 @@ impl QueueStorage { thumbnail, } = item_queue_info; - let thumbnail_info = - if let Some((thumbnail_info, thumbnail_media_request, thumbnail_content_type)) = - thumbnail - { - let upload_thumbnail_txn = thumbnail_info.txn.clone(); + let thumbnail_info = if let Some(QueueThumbnailInfo { + finish_upload_thumbnail_info: thumbnail_info, + media_request_parameters: thumbnail_media_request, + content_type: thumbnail_content_type, + .. + }) = thumbnail + { + let upload_thumbnail_txn = thumbnail_info.txn.clone(); - // Save the thumbnail upload request as a dependent request of the last file - // upload. - store - .save_dependent_queued_request( - &self.room_id, - &last_upload_file_txn, - upload_thumbnail_txn.clone().into(), - created_at, - DependentQueuedRequestKind::UploadFileOrThumbnail { - content_type: thumbnail_content_type.to_string(), - cache_key: thumbnail_media_request.clone(), - related_to: send_event_txn.clone(), - parent_is_thumbnail_upload: false, - }, - ) - .await?; + // Save the thumbnail upload request as a dependent request of the last file + // upload. + store + .save_dependent_queued_request( + &self.room_id, + &last_upload_file_txn, + upload_thumbnail_txn.clone().into(), + created_at, + DependentQueuedRequestKind::UploadFileOrThumbnail { + content_type: thumbnail_content_type.to_string(), + cache_key: thumbnail_media_request.clone(), + related_to: send_event_txn.clone(), + parent_is_thumbnail_upload: false, + }, + ) + .await?; - last_upload_file_txn = upload_thumbnail_txn; + last_upload_file_txn = upload_thumbnail_txn; - Some(thumbnail_info) - } else { - None - }; + Some(thumbnail_info) + } else { + None + }; // Save the file upload as a dependent request of the previous upload. store @@ -1386,9 +1403,15 @@ impl QueueStorage { created_at: MilliSecondsSinceUnixEpoch, upload_file_txn: OwnedTransactionId, file_media_request: MediaRequestParameters, - thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>, + thumbnail: Option, ) -> Result, RoomSendQueueStorageError> { - if let Some((thumbnail_info, thumbnail_media_request, thumbnail_content_type)) = thumbnail { + if let Some(QueueThumbnailInfo { + finish_upload_thumbnail_info: thumbnail_info, + media_request_parameters: thumbnail_media_request, + content_type: thumbnail_content_type, + .. + }) = thumbnail + { let upload_thumbnail_txn = thumbnail_info.txn.clone(); // Save the thumbnail upload request. @@ -1980,7 +2003,7 @@ struct GalleryItemQueueInfo { content_type: Mime, upload_file_txn: OwnedTransactionId, file_media_request: MediaRequestParameters, - thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>, + thumbnail: Option, } /// The content of a local echo. diff --git a/crates/matrix-sdk/src/send_queue/upload.rs b/crates/matrix-sdk/src/send_queue/upload.rs index f3e61757e6d..08bcba42e43 100644 --- a/crates/matrix-sdk/src/send_queue/upload.rs +++ b/crates/matrix-sdk/src/send_queue/upload.rs @@ -46,7 +46,7 @@ use ruma::{ }; use tracing::{debug, error, instrument, trace, warn, Span}; -use super::{QueueStorage, RoomSendQueue, RoomSendQueueError}; +use super::{QueueStorage, QueueThumbnailInfo, RoomSendQueue, RoomSendQueueError}; use crate::{ attachment::{AttachmentConfig, Thumbnail}, room::edit::update_media_caption, @@ -170,7 +170,7 @@ fn update_gallery_event_after_upload( struct MediaCacheResult { upload_thumbnail_txn: Option, event_thumbnail_info: Option<(MediaSource, Box)>, - queue_thumbnail_info: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>, + queue_thumbnail_info: Option, } impl RoomSendQueue { @@ -464,11 +464,15 @@ impl RoomSendQueue { thumbnail_media_request.source.clone(), thumbnail_info, )), - queue_thumbnail_info: Some(( - FinishUploadThumbnailInfo { txn, width: None, height: None }, - thumbnail_media_request, + queue_thumbnail_info: Some(QueueThumbnailInfo { + finish_upload_thumbnail_info: FinishUploadThumbnailInfo { + txn, + width: None, + height: None, + }, + media_request_parameters: thumbnail_media_request, content_type, - )), + }), }) } else { Ok(Default::default()) From ed365400fd12ba61691171381870f7a0aa34e24f Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Mon, 7 Jul 2025 19:58:29 +0200 Subject: [PATCH 03/30] feat(send_queue): cache thumbnail sizes to use them in progress reporting later Signed-off-by: Johannes Marbach --- crates/matrix-sdk/src/send_queue/mod.rs | 30 +++++++++++++++++++--- crates/matrix-sdk/src/send_queue/upload.rs | 2 ++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index 915dc9cbd2d..28073d5e8b3 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -303,6 +303,9 @@ struct QueueThumbnailInfo { /// The thumbnail's mime type. content_type: Mime, + + /// The thumbnail's file size in bytes. + file_size: usize, } /// A specific room's send queue ran into an error, and it has disabled itself. @@ -964,6 +967,10 @@ struct QueueStorage { /// To which room is this storage related. room_id: OwnedRoomId, + + /// In-memory mapping of media transaction IDs to thumbnail sizes for the + /// purpose of progress reporting. + thumbnail_size_cache: Arc>>>>, } impl QueueStorage { @@ -975,7 +982,11 @@ impl QueueStorage { /// Create a new queue for queuing requests to be sent later. fn new(client: WeakClient, room: OwnedRoomId) -> Self { - Self { room_id: room, store: StoreLock { client, being_sent: Default::default() } } + Self { + room_id: room, + store: StoreLock { client, being_sent: Default::default() }, + thumbnail_size_cache: Arc::new(Mutex::new(HashMap::new())), + } } /// Push a new event to be sent in the queue, with a default priority of 0. @@ -1132,6 +1143,8 @@ impl QueueStorage { warn!(txn_id = %transaction_id, "request marked as sent was missing from storage"); } + self.thumbnail_size_cache.lock().await.remove(transaction_id); + Ok(()) } @@ -1172,6 +1185,8 @@ impl QueueStorage { .remove_send_queue_request(&self.room_id, transaction_id) .await?; + self.thumbnail_size_cache.lock().await.remove(transaction_id); + Ok(removed) } @@ -1234,6 +1249,8 @@ impl QueueStorage { let client = guard.client()?; let store = client.state_store(); + let media_sizes = vec![thumbnail.as_ref().map(|t| t.file_size)]; + let thumbnail_info = self .push_thumbnail_and_media_uploads( store, @@ -1251,7 +1268,7 @@ impl QueueStorage { .save_dependent_queued_request( &self.room_id, &upload_file_txn, - send_event_txn.into(), + send_event_txn.clone().into(), created_at, DependentQueuedRequestKind::FinishUpload { local_echo: Box::new(event), @@ -1261,6 +1278,8 @@ impl QueueStorage { ) .await?; + self.thumbnail_size_cache.lock().await.insert(send_event_txn, media_sizes); + Ok(()) } @@ -1281,6 +1300,7 @@ impl QueueStorage { let store = client.state_store(); let mut finish_item_infos = Vec::with_capacity(item_queue_infos.len()); + let mut media_sizes = Vec::with_capacity(item_queue_infos.len()); let Some((first, rest)) = item_queue_infos.split_first() else { return Ok(()); @@ -1303,6 +1323,7 @@ impl QueueStorage { finish_item_infos .push(FinishGalleryItemInfo { file_upload: upload_file_txn.clone(), thumbnail_info }); + media_sizes.push(thumbnail.as_ref().map(|t| t.file_size)); let mut last_upload_file_txn = upload_file_txn.clone(); @@ -1367,6 +1388,7 @@ impl QueueStorage { file_upload: upload_file_txn.clone(), thumbnail_info: thumbnail_info.cloned(), }); + media_sizes.push(thumbnail.as_ref().map(|t| t.file_size)); last_upload_file_txn = upload_file_txn.clone(); } @@ -1377,7 +1399,7 @@ impl QueueStorage { .save_dependent_queued_request( &self.room_id, &last_upload_file_txn, - send_event_txn.into(), + send_event_txn.clone().into(), created_at, DependentQueuedRequestKind::FinishGallery { local_echo: Box::new(event), @@ -1386,6 +1408,8 @@ impl QueueStorage { ) .await?; + self.thumbnail_size_cache.lock().await.insert(send_event_txn, media_sizes); + Ok(()) } diff --git a/crates/matrix-sdk/src/send_queue/upload.rs b/crates/matrix-sdk/src/send_queue/upload.rs index 08bcba42e43..99ddec91ef5 100644 --- a/crates/matrix-sdk/src/send_queue/upload.rs +++ b/crates/matrix-sdk/src/send_queue/upload.rs @@ -445,6 +445,7 @@ impl RoomSendQueue { // Create the information required for filling the thumbnail section of the // event. let (data, content_type, thumbnail_info) = thumbnail.into_parts(); + let file_size = data.len(); // Cache thumbnail in the cache store. let thumbnail_media_request = Media::make_local_file_media_request(&txn); @@ -472,6 +473,7 @@ impl RoomSendQueue { }, media_request_parameters: thumbnail_media_request, content_type, + file_size, }), }) } else { From 84fa08780b7e933244b880f40d84bae270adee94 Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Mon, 7 Jul 2025 19:58:31 +0200 Subject: [PATCH 04/30] chore(send_queue): Make parent_is_thumbnail_upload available outside of unstable-msc4274 to use it during media progress reporting later Signed-off-by: Johannes Marbach --- crates/matrix-sdk-base/src/store/send_queue.rs | 2 -- crates/matrix-sdk/src/send_queue/mod.rs | 14 -------------- 2 files changed, 16 deletions(-) diff --git a/crates/matrix-sdk-base/src/store/send_queue.rs b/crates/matrix-sdk-base/src/store/send_queue.rs index 877ad0b4b39..17b530d1504 100644 --- a/crates/matrix-sdk-base/src/store/send_queue.rs +++ b/crates/matrix-sdk-base/src/store/send_queue.rs @@ -238,7 +238,6 @@ pub enum DependentQueuedRequestKind { related_to: OwnedTransactionId, /// Whether the depended upon request was a thumbnail or a file upload. - #[cfg(feature = "unstable-msc4274")] #[serde(default = "default_parent_is_thumbnail_upload")] parent_is_thumbnail_upload: bool, }, @@ -275,7 +274,6 @@ pub enum DependentQueuedRequestKind { /// If parent_is_thumbnail_upload is missing, we assume the request is for a /// file upload following a thumbnail upload. This was the only possible case /// before parent_is_thumbnail_upload was introduced. -#[cfg(feature = "unstable-msc4274")] fn default_parent_is_thumbnail_upload() -> bool { true } diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index 28073d5e8b3..0f5dd1128c2 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -1467,7 +1467,6 @@ impl QueueStorage { content_type: content_type.to_string(), cache_key: file_media_request, related_to: send_event_txn, - #[cfg(feature = "unstable-msc4274")] parent_is_thumbnail_upload: true, }, ) @@ -1859,25 +1858,12 @@ impl QueueStorage { content_type, cache_key, related_to, - #[cfg(feature = "unstable-msc4274")] parent_is_thumbnail_upload, } => { let Some(parent_key) = parent_key else { // Not finished yet, we should retry later => false. return Ok(false); }; - let parent_is_thumbnail_upload = { - cfg_if::cfg_if! { - if #[cfg(feature = "unstable-msc4274")] { - parent_is_thumbnail_upload - } else { - // Before parent_is_thumbnail_upload was introduced, the only - // possible usage for this request was a file upload following - // a thumbnail upload. - true - } - } - }; self.handle_dependent_file_or_thumbnail_upload( client, dependent_request.own_transaction_id.into(), From 1c9436b0c4ca2eff6fc4ffb3bb1ccd37a22a45dd Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Mon, 7 Jul 2025 19:58:32 +0200 Subject: [PATCH 05/30] feat(send_queue): enable progress monitoring in RoomSendQueue::handle_request Signed-off-by: Johannes Marbach --- crates/matrix-sdk/src/encryption/futures.rs | 2 +- crates/matrix-sdk/src/send_queue/mod.rs | 39 +++++++++++++++------ 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/crates/matrix-sdk/src/encryption/futures.rs b/crates/matrix-sdk/src/encryption/futures.rs index a487ff8ed21..8db56bef539 100644 --- a/crates/matrix-sdk/src/encryption/futures.rs +++ b/crates/matrix-sdk/src/encryption/futures.rs @@ -30,7 +30,7 @@ use crate::{config::RequestConfig, Client, Media, Result, TransmissionProgress}; pub struct UploadEncryptedFile<'a, R: ?Sized> { client: &'a Client, reader: &'a mut R, - send_progress: SharedObservable, + pub(crate) send_progress: SharedObservable, request_config: Option, } diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index 0f5dd1128c2..aa23e457ad7 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -138,6 +138,7 @@ use std::{ }; use as_variant::as_variant; +use eyeball::SharedObservable; #[cfg(feature = "unstable-msc4274")] use matrix_sdk_base::store::FinishGalleryItemInfo; use matrix_sdk_base::{ @@ -176,7 +177,7 @@ use crate::{ config::RequestConfig, error::RetryKind, room::{edit::EditedContent, WeakRoom}, - Client, Media, Room, + Client, Media, Room, TransmissionProgress, }; mod upload; @@ -606,7 +607,7 @@ impl RoomSendQueue { continue; }; - match Self::handle_request(&room, queued_request, cancel_upload_rx).await { + match Self::handle_request(&room, queued_request, cancel_upload_rx, None).await { Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await { Ok(()) => match parent_key { @@ -707,6 +708,7 @@ impl RoomSendQueue { room: &Room, request: QueuedRequest, cancel_upload_rx: Option>, + progress: Option>, ) -> Result, crate::Error> { match request.kind { QueuedRequestKind::Event { content } => { @@ -754,18 +756,29 @@ impl RoomSendQueue { let media_source = if room.latest_encryption_state().await?.is_encrypted() { trace!("upload will be encrypted (encrypted room)"); let mut cursor = std::io::Cursor::new(data); - let encrypted_file = room - .client() + let client = room.client(); + let mut req = client .upload_encrypted_file(&mut cursor) - .with_request_config(RequestConfig::short_retry()) - .await?; + .with_request_config(RequestConfig::short_retry()); + + if let Some(progress) = progress { + req = req.with_send_progress_observable(progress); + } + + let encrypted_file = req.await?; MediaSource::Encrypted(Box::new(encrypted_file)) } else { trace!("upload will be in clear text (room without encryption)"); let request_config = RequestConfig::short_retry() .timeout(Media::reasonable_upload_timeout(&data)); - let res = - room.client().media().upload(&mime, data, Some(request_config)).await?; + let mut req = + room.client().media().upload(&mime, data, Some(request_config)); + + if let Some(progress) = progress { + req = req.with_send_progress_observable(progress); + } + + let res = req.await?; MediaSource::Plain(res.content_uri) }; @@ -773,8 +786,14 @@ impl RoomSendQueue { let media_source = { let request_config = RequestConfig::short_retry() .timeout(Media::reasonable_upload_timeout(&data)); - let res = - room.client().media().upload(&mime, data, Some(request_config)).await?; + let mut req = + room.client().media().upload(&mime, data, Some(request_config)); + + if let Some(progress) = progress { + req = req.with_send_progress_observable(progress); + } + + let res = req.await?; MediaSource::Plain(res.content_uri) }; From d2f47f3880024e782af0164503ebbf0a2ddb1cbf Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Mon, 7 Jul 2025 19:58:33 +0200 Subject: [PATCH 06/30] feat(sdk): introduce AbstractProgress for tracking media progress in pseudo units Signed-off-by: Johannes Marbach --- crates/matrix-sdk/src/http_client/mod.rs | 15 +++++++++++++++ crates/matrix-sdk/src/lib.rs | 2 +- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/http_client/mod.rs b/crates/matrix-sdk/src/http_client/mod.rs index 541baa30683..3dcb3082454 100644 --- a/crates/matrix-sdk/src/http_client/mod.rs +++ b/crates/matrix-sdk/src/http_client/mod.rs @@ -231,6 +231,21 @@ pub struct TransmissionProgress { pub total: usize, } +/// Progress of an operation in abstract units. +/// +/// Contrary to [`TransmissionProgress`], this allows tracking the progress +/// of sending or receiving a payload in estimated pseudo units representing a +/// percentage. This is helpful in cases where the exact progress in bytes isn't +/// known, for instance, because encryption (which changes the size) happens on +/// the fly. +#[derive(Clone, Copy, Debug, Default)] +pub struct AbstractProgress { + /// How many units were already transferred. + pub current: usize, + /// How many units there are in total. + pub total: usize, +} + async fn response_to_http_response( mut response: reqwest::Response, ) -> Result, reqwest::Error> { diff --git a/crates/matrix-sdk/src/lib.rs b/crates/matrix-sdk/src/lib.rs index 2680d4cb1e3..fcef3394424 100644 --- a/crates/matrix-sdk/src/lib.rs +++ b/crates/matrix-sdk/src/lib.rs @@ -74,7 +74,7 @@ pub use error::{ Error, HttpError, HttpResult, NotificationSettingsError, RefreshTokenError, Result, RumaApiError, }; -pub use http_client::TransmissionProgress; +pub use http_client::{AbstractProgress, TransmissionProgress}; #[cfg(all(feature = "e2e-encryption", feature = "sqlite"))] pub use matrix_sdk_sqlite::SqliteCryptoStore; #[cfg(feature = "sqlite")] From 5c0bb6a912bd224f9924a874fc2ca9e8317bea66 Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Mon, 7 Jul 2025 19:58:34 +0200 Subject: [PATCH 07/30] chore(send_queue): rename RoomSendQueueUpdate::UploadedMedia to MediaUpload to prepare it for communicating upload progress Signed-off-by: Johannes Marbach --- crates/matrix-sdk-ui/src/timeline/controller/mod.rs | 2 +- crates/matrix-sdk/src/send_queue/mod.rs | 7 ++++--- crates/matrix-sdk/tests/integration/send_queue.rs | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs index e4a471e0e80..a7a2335d7eb 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs @@ -1317,7 +1317,7 @@ impl TimelineController { .await; } - RoomSendQueueUpdate::UploadedMedia { related_to, .. } => { + RoomSendQueueUpdate::MediaUpload { related_to, .. } => { // TODO(bnjbvr): Do something else? info!(txn_id = %related_to, "some media for a media event has been uploaded"); } diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index aa23e457ad7..818b4b41b24 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -619,7 +619,7 @@ impl RoomSendQueue { } SentRequestKey::Media(media_info) => { - let _ = updates.send(RoomSendQueueUpdate::UploadedMedia { + let _ = updates.send(RoomSendQueueUpdate::MediaUpload { related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(), file: media_info.file, }); @@ -2129,8 +2129,9 @@ pub enum RoomSendQueueUpdate { event_id: OwnedEventId, }, - /// A media has been successfully uploaded. - UploadedMedia { + /// A media upload (consisting of a file and possibly a thumbnail) has made + /// progress. + MediaUpload { /// The media event this uploaded media relates to. related_to: OwnedTransactionId, diff --git a/crates/matrix-sdk/tests/integration/send_queue.rs b/crates/matrix-sdk/tests/integration/send_queue.rs index b900c74e686..a9c2c14b57b 100644 --- a/crates/matrix-sdk/tests/integration/send_queue.rs +++ b/crates/matrix-sdk/tests/integration/send_queue.rs @@ -164,7 +164,7 @@ macro_rules! assert_update { // Returns a tuple of (transaction_id, send_handle). ($watch:ident => uploaded { related_to = $related_to:expr, mxc = $mxc:expr }) => {{ assert_let!( - Ok(Ok(RoomSendQueueUpdate::UploadedMedia { + Ok(Ok(RoomSendQueueUpdate::MediaUpload { related_to, file, })) = timeout(Duration::from_secs(1), $watch.recv()).await From e4e2796074e7710720896ecf3cfe80f1232aeab8 Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Mon, 7 Jul 2025 19:58:36 +0200 Subject: [PATCH 08/30] feat(send_queue): communicate media upload progress in RoomSendQueueUpdate::MediaUpload Signed-off-by: Johannes Marbach --- crates/matrix-sdk/src/send_queue/mod.rs | 276 +++++++++++++++++- crates/matrix-sdk/src/test_utils/client.rs | 11 + .../tests/integration/send_queue.rs | 62 +++- 3 files changed, 336 insertions(+), 13 deletions(-) diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index 818b4b41b24..d70b5e112b8 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -177,7 +177,7 @@ use crate::{ config::RequestConfig, error::RetryKind, room::{edit::EditedContent, WeakRoom}, - Client, Media, Room, TransmissionProgress, + AbstractProgress, Client, Media, Room, TransmissionProgress, }; mod upload; @@ -549,7 +549,7 @@ impl RoomSendQueue { locally_enabled: Arc, global_error_reporter: broadcast::Sender, is_dropping: Arc, - _report_media_upload_progress: Arc, + report_media_upload_progress: Arc, ) { trace!("spawned the sending task"); @@ -607,7 +607,21 @@ impl RoomSendQueue { continue; }; - match Self::handle_request(&room, queued_request, cancel_upload_rx, None).await { + // Prepare to watch and communicate the request progress for media uploads. + let media_upload_info = + RoomSendQueue::try_create_media_upload_info(&queued_request, &room, &queue) + .await + .unwrap_or_default(); + let progress = RoomSendQueue::try_create_media_upload_progress_observable( + &report_media_upload_progress, + &media_upload_info, + &related_txn_id, + &updates, + ); + + match Self::handle_request(&room, queued_request, cancel_upload_rx, progress.clone()) + .await + { Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await { Ok(()) => match parent_key { @@ -621,7 +635,17 @@ impl RoomSendQueue { SentRequestKey::Media(media_info) => { let _ = updates.send(RoomSendQueueUpdate::MediaUpload { related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(), - file: media_info.file, + file: Some(media_info.file), + index: media_upload_info.index, + progress: estimate_combined_media_upload_progress( + // The file finished uploading + AbstractProgress { + current: media_upload_info.bytes, + total: media_upload_info.bytes, + }, + media_upload_info.uploaded, + media_upload_info.pending, + ), }); } }, @@ -701,6 +725,162 @@ impl RoomSendQueue { info!("exited sending task"); } + /// Try to create metadata required to compute the progress of a media + /// upload. + async fn try_create_media_upload_info( + queued_request: &QueuedRequest, + room: &Room, + queue: &QueueStorage, + ) -> Option { + let QueuedRequestKind::MediaUpload { + cache_key, + thumbnail_source, + related_to, + #[cfg(feature = "unstable-msc4274")] + accumulated, + .. + } = &queued_request.kind + else { + return None; + }; + + // Determine the item's index, if this is a gallery upload. + let index = { + cfg_if::cfg_if! { + if #[cfg(feature = "unstable-msc4274")] { + accumulated.len() + } else { + 0 // Before MSC4274 only a single file (and thumbnail) could be sent per event. + } + } + }; + + // Get the size of the file being uploaded from the event cache. + let bytes = if let Ok(cache) = room.client().event_cache_store().lock().await { + if let Ok(Some(content)) = cache.get_media_content(cache_key).await { + content.len() + } else { + 0 + } + } else { + 0 + }; + + // If this is a file upload, get the size of any previously uploaded thumbnail + // from the in-memory media sizes cache. + let uploaded = if thumbnail_source.is_some() { + if let Some(sizes) = queue.thumbnail_size_cache.lock().await.get(related_to) { + sizes.get(index).copied().flatten().unwrap_or(0) + } else { + 0 + } + } else { + 0 + }; + + // If this is a thumbnail upload, get the size of the pending file upload from + // the dependent requests. + let pending = RoomSendQueue::get_dependent_pending_file_upload_size( + queued_request.transaction_id.clone(), + queue, + room, + ) + .await; + + Some(MediaUploadInfo { index: index as u64, bytes, uploaded, pending }) + } + + /// Determine the size of a pending file upload, if this is a thumbnail + /// upload or return 0 otherwise. + async fn get_dependent_pending_file_upload_size( + txn_id: OwnedTransactionId, + queue: &QueueStorage, + room: &Room, + ) -> usize { + let guard = queue.store.lock().await; + + let Ok(client) = guard.client() else { + return 0; + }; + + let Ok(dependent_requests) = + client.state_store().load_dependent_queued_requests(room.room_id()).await + else { + return 0; + }; + + let Some((cache_key, parent_is_thumbnail_upload)) = dependent_requests.iter().find_map(|r| { + if r.parent_transaction_id != txn_id { + return None; + } + as_variant!(&r.kind, DependentQueuedRequestKind::UploadFileOrThumbnail { cache_key, parent_is_thumbnail_upload, .. } => (cache_key.clone(), *parent_is_thumbnail_upload)) + }) else { + return 0; + }; + + // If this is not a thumbnail upload, we're uploading a gallery and the + // dependent request is for the next gallery item. + if !parent_is_thumbnail_upload { + return 0; + } + + if let Ok(cache) = room.client().event_cache_store().lock().await { + if let Ok(Some(content)) = cache.get_media_content(&cache_key).await { + content.len() + } else { + 0 + } + } else { + 0 + } + } + + /// Try to creates an observable to watch a media's upload progress. + fn try_create_media_upload_progress_observable( + report_media_upload_progress: &Arc, + media_upload_info: &MediaUploadInfo, + related_txn_id: &Option, + updates: &broadcast::Sender, + ) -> Option> { + if !report_media_upload_progress.load(Ordering::SeqCst) { + return None; + } + + if let Some(related_txn_id) = related_txn_id { + let progress: SharedObservable = Default::default(); + let mut subscriber = progress.subscribe(); + + let report_media_upload_progress = Arc::clone(report_media_upload_progress); + let media_upload_info = media_upload_info.clone(); + let related_to = related_txn_id.clone(); + let updates = updates.clone(); + + // Watch and communicate the progress on a detached background task. Once + // the progress observable is dropped, next() will return None and the + // task will end. + spawn(async move { + while let Some(progress) = subscriber.next().await { + if report_media_upload_progress.load(Ordering::SeqCst) { + let _ = updates.send(RoomSendQueueUpdate::MediaUpload { + related_to: related_to.clone(), + file: None, + index: media_upload_info.index, + progress: estimate_combined_media_upload_progress( + estimate_media_upload_progress(progress, media_upload_info.bytes), + media_upload_info.uploaded, + media_upload_info.pending, + ), + }); + } + } + }); + + Some(progress) + } else { + None + } + } + /// Handles a single request and returns the [`SentRequestKey`] on success /// (unless the request was cancelled, in which case it'll return /// `None`). @@ -851,6 +1031,65 @@ impl RoomSendQueue { } } +/// Estimates the upload progress for a single media file (either a thumbnail or +/// a file). +/// +/// This proportionally maps the upload progress given in actual bytes sent +/// (possibly after encryption) into units of the unencrypted file sizes. +/// +/// # Arguments +/// +/// * `progress` - The [`TransmissionProgress`] of uploading the file (possibly +/// after encryption). +/// +/// * `bytes` - The total number of bytes in the file before encryption. +fn estimate_media_upload_progress( + progress: TransmissionProgress, + bytes: usize, +) -> AbstractProgress { + if progress.total == 0 { + return AbstractProgress { current: 0, total: 0 }; + } + + // Did the file finish uploading? + if progress.current == progress.total { + return AbstractProgress { current: bytes, total: bytes }; + } + + // The file is still uploading. Use the rule of 3 to proportionally map the + // progress into units of the original file size. + AbstractProgress { + current: (progress.current as f64 / progress.total as f64 * bytes as f64).round() as usize, + total: bytes, + } +} + +/// Estimate the combined upload progress across a media file and its +/// thumbnail. +/// +/// # Arguments +/// +/// * `progress` - The progress of uploading the current file mapped into units +/// of the original file size before encryption. +/// +/// * `uploaded` - If this is a media file upload and an associated thumbnail +/// was previously uploaded, the number of bytes in the thumbnail before +/// encryption. Otherwise, zero. +/// +/// * `pending` - If this is a thumbnail upload, the number of bytes in the +/// still to be uploaded associated media file before encryption. Otherwise, +/// zero. +fn estimate_combined_media_upload_progress( + progress: AbstractProgress, + uploaded: usize, + pending: usize, +) -> AbstractProgress { + AbstractProgress { + current: uploaded + progress.current, + total: uploaded + progress.total + pending, + } +} + impl From<&crate::Error> for QueueWedgeError { fn from(value: &crate::Error) -> Self { match value { @@ -921,6 +1160,23 @@ struct BeingSentInfo { cancel_upload: Option>, } +/// Information needed to compute the progress of uploading a media and its +/// associated thumbnail. +#[derive(Clone, Default)] +struct MediaUploadInfo { + /// The index of the uploaded item if this is a gallery upload. Otherwise, + /// zero. + index: u64, + /// The total number of bytes in the file currently being uploaded. + bytes: usize, + /// If the current file is not a thumbnail, the total number of bytes in a + /// previously uploaded thumbnail, if any exists. Otherwise, zero. + uploaded: usize, + /// If the current file is a thumbnail, the total number of bytes in the + /// related media file still to be uploaded. Otherwise, zero. + pending: usize, +} + impl BeingSentInfo { /// Aborts the upload, if a trigger is available. /// @@ -2135,8 +2391,16 @@ pub enum RoomSendQueueUpdate { /// The media event this uploaded media relates to. related_to: OwnedTransactionId, - /// The final media source for the file that was just uploaded. - file: MediaSource, + /// The final media source for the file if it has finished uploading. + file: Option, + + /// The index of the media within the transaction. A file and its + /// thumbnail share the same index. + index: u64, + + /// The combined upload progress across the file and, if existing, its + /// thumbnail. + progress: AbstractProgress, }, } diff --git a/crates/matrix-sdk/src/test_utils/client.rs b/crates/matrix-sdk/src/test_utils/client.rs index e8fffaced2c..586ab10bbca 100644 --- a/crates/matrix-sdk/src/test_utils/client.rs +++ b/crates/matrix-sdk/src/test_utils/client.rs @@ -131,6 +131,17 @@ impl MockClientBuilder { self } + /// Set whether to report media upload progress via the send queue. + pub fn with_enable_send_queue_media_upload_progress_reporting( + mut self, + enable_send_queue_media_upload_progress_reporting: bool, + ) -> Self { + self.builder = self.builder.with_enable_send_queue_media_upload_progress_reporting( + enable_send_queue_media_upload_progress_reporting, + ); + self + } + /// Finish building the client into the final [`Client`] instance. pub async fn build(self) -> Client { let mut builder = self.builder; diff --git a/crates/matrix-sdk/tests/integration/send_queue.rs b/crates/matrix-sdk/tests/integration/send_queue.rs index a9c2c14b57b..f54bb059a8e 100644 --- a/crates/matrix-sdk/tests/integration/send_queue.rs +++ b/crates/matrix-sdk/tests/integration/send_queue.rs @@ -14,7 +14,7 @@ use matrix_sdk::{ RoomSendQueueUpdate, SendHandle, }, test_utils::mocks::{MatrixMock, MatrixMockServer}, - Client, MemoryStore, + AbstractProgress, Client, MemoryStore, }; use matrix_sdk_test::{ async_test, event_factory::EventFactory, InvitedRoomBuilder, KnockedRoomBuilder, @@ -167,14 +167,49 @@ macro_rules! assert_update { Ok(Ok(RoomSendQueueUpdate::MediaUpload { related_to, file, + .. })) = timeout(Duration::from_secs(1), $watch.recv()).await ); assert_eq!(related_to, $related_to); - assert_let!(MediaSource::Plain(mxc) = file); + assert_let!(Some(MediaSource::Plain(mxc)) = file); assert_eq!(mxc, $mxc); }}; + // Check the next stream events communicate upload progress and finally the uploaded media. + // Returns a tuple of (transaction_id, send_handle). + ($watch:ident => uploaded_with_progress { related_to = $related_to:expr, mxc = $mxc:expr, progress_start = $progress_start:expr, progress_end = $progress_end:expr, progress_total = $progress_total:expr }) => {{ + let mut prev_progress: Option = None; + + loop { + assert_let!( + Ok(Ok(RoomSendQueueUpdate::MediaUpload { + related_to, + file, + progress, .. + })) = timeout(Duration::from_secs(1), $watch.recv()).await + ); + + assert_eq!(related_to, $related_to); + + if let Some(progress_start) = $progress_start { + assert!(progress.current >= progress_start); + } + assert!(progress.current <= $progress_end); + assert_eq!(progress.total, $progress_total); + if let Some(prev_progress) = prev_progress { + assert!(progress.current >= prev_progress.current); + } + prev_progress = Some(progress); + + if let Some(MediaSource::Plain(mxc)) = file { + assert_eq!(progress.current, $progress_end); + assert_eq!(mxc, $mxc); + break; + } + } + }}; + // Check the next stream event is a local echo for a reaction with the content $key which // applies to the local echo with transaction id $parent. ($watch:ident => local reaction { key = $key:expr, parent = $parent_txn_id:expr }) => {{ @@ -1792,7 +1827,11 @@ async fn test_media_uploads() { // Mark the room as joined. let room_id = room_id!("!a:b.c"); - let client = mock.client_builder().build().await; + let client = mock + .client_builder() + .with_enable_send_queue_media_upload_progress_reporting(true) + .build() + .await; let room = mock.sync_joined_room(&client, room_id).await; let q = room.send_queue(); @@ -1823,6 +1862,9 @@ async fn test_media_uploads() { ..Default::default() }); + let size_data = data.len(); + let size_thumbnail = thumbnail.data.len(); + let transaction_id = TransactionId::new(); let mentions = Mentions::with_user_ids([owned_user_id!("@ivan:sdk.rs")]); let config = AttachmentConfig::new() @@ -1982,14 +2024,20 @@ async fn test_media_uploads() { assert!(watch.is_empty()); drop(block_upload); - assert_update!(watch => uploaded { + assert_update!(watch => uploaded_with_progress { related_to = transaction_id, - mxc = mxc_uri!("mxc://sdk.rs/thumbnail") + mxc = mxc_uri!("mxc://sdk.rs/thumbnail"), + progress_start = None, + progress_end = size_thumbnail, + progress_total = size_data + size_thumbnail }); - assert_update!(watch => uploaded { + assert_update!(watch => uploaded_with_progress { related_to = transaction_id, - mxc = mxc_uri!("mxc://sdk.rs/media") + mxc = mxc_uri!("mxc://sdk.rs/media"), + progress_start = Some(size_thumbnail), + progress_end = size_data + size_thumbnail, + progress_total = size_data + size_thumbnail }); let edit_msg = assert_update!(watch => edit local echo { From 6a82e29ac0211d6955734fc77e41553ff85d1f7a Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Mon, 7 Jul 2025 19:58:37 +0200 Subject: [PATCH 09/30] feat(timeline): communicate media upload progress through EventSendState::NotSentYet Signed-off-by: Johannes Marbach --- .../controller/decryption_retry_task.rs | 2 +- .../src/timeline/controller/mod.rs | 25 +++- .../timeline/controller/observable_items.rs | 2 +- .../src/timeline/event_handler.rs | 2 +- .../src/timeline/event_item/local.rs | 23 ++- .../src/timeline/event_item/mod.rs | 2 +- crates/matrix-sdk-ui/src/timeline/mod.rs | 10 +- .../matrix-sdk-ui/src/timeline/tests/echo.rs | 7 +- .../tests/integration/timeline/echo.rs | 8 +- .../tests/integration/timeline/edit.rs | 8 +- .../tests/integration/timeline/media.rs | 139 +++++++++++++++--- .../tests/integration/timeline/mod.rs | 4 +- .../tests/integration/timeline/queue.rs | 10 +- .../tests/integration/timeline/reactions.rs | 8 +- .../tests/integration/timeline/replies.rs | 14 +- .../src/tests/timeline.rs | 2 +- 16 files changed, 199 insertions(+), 67 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs b/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs index 10030684000..0e79422909a 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs @@ -627,7 +627,7 @@ mod tests { fn local_event() -> Arc { let event_kind = EventTimelineItemKind::Local(LocalEventTimelineItem { - send_state: EventSendState::NotSentYet, + send_state: EventSendState::NotSentYet { progress: None }, transaction_id: OwnedTransactionId::from("trans"), send_handle: None, }); diff --git a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs index a7a2335d7eb..58f96875b71 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs @@ -66,9 +66,9 @@ use super::{ item::TimelineUniqueId, subscriber::TimelineSubscriber, traits::{Decryptor, RoomDataProvider}, - DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails, - PaginationError, Profile, TimelineDetails, TimelineEventItemId, TimelineFocus, TimelineItem, - TimelineItemContent, TimelineItemKind, VirtualTimelineItem, + DateDividerMode, EmbeddedEvent, Error, EventSendProgress, EventSendState, EventTimelineItem, + InReplyToDetails, PaginationError, Profile, TimelineDetails, TimelineEventItemId, + TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind, VirtualTimelineItem, }; use crate::{ timeline::{ @@ -1031,7 +1031,7 @@ impl TimelineController { warn!("We looked for a local item, but it transitioned as remote??"); return false; }; - prev_local_item.with_send_state(EventSendState::NotSentYet) + prev_local_item.with_send_state(EventSendState::NotSentYet { progress: None }) }; // Replace the local-related state (kind) and the content state. @@ -1309,7 +1309,11 @@ impl TimelineController { } RoomSendQueueUpdate::RetryEvent { transaction_id } => { - self.update_event_send_state(&transaction_id, EventSendState::NotSentYet).await; + self.update_event_send_state( + &transaction_id, + EventSendState::NotSentYet { progress: None }, + ) + .await; } RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => { @@ -1317,9 +1321,14 @@ impl TimelineController { .await; } - RoomSendQueueUpdate::MediaUpload { related_to, .. } => { - // TODO(bnjbvr): Do something else? - info!(txn_id = %related_to, "some media for a media event has been uploaded"); + RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => { + self.update_event_send_state( + &related_to, + EventSendState::NotSentYet { + progress: Some(EventSendProgress::MediaUpload { index, progress }), + }, + ) + .await; } } } diff --git a/crates/matrix-sdk-ui/src/timeline/controller/observable_items.rs b/crates/matrix-sdk-ui/src/timeline/controller/observable_items.rs index 59add7b611f..acf6f6ba75c 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/observable_items.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/observable_items.rs @@ -784,7 +784,7 @@ mod observable_items_tests { thread_summary: None, }), EventTimelineItemKind::Local(LocalEventTimelineItem { - send_state: EventSendState::NotSentYet, + send_state: EventSendState::NotSentYet { progress: None }, transaction_id: transaction_id.into(), send_handle: None, }), diff --git a/crates/matrix-sdk-ui/src/timeline/event_handler.rs b/crates/matrix-sdk-ui/src/timeline/event_handler.rs index b0b202b6809..e4d8ba47b63 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_handler.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_handler.rs @@ -741,7 +741,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { let kind: EventTimelineItemKind = match &self.ctx.flow { Flow::Local { txn_id, send_handle } => LocalEventTimelineItem { - send_state: EventSendState::NotSentYet, + send_state: EventSendState::NotSentYet { progress: None }, transaction_id: txn_id.to_owned(), send_handle: send_handle.clone(), } diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/local.rs b/crates/matrix-sdk-ui/src/timeline/event_item/local.rs index 2890b6eab8e..8aa3f9ec8e0 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/local.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/local.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use as_variant::as_variant; -use matrix_sdk::{send_queue::SendHandle, Error}; +use matrix_sdk::{send_queue::SendHandle, AbstractProgress, Error}; use ruma::{EventId, OwnedEventId, OwnedTransactionId}; use super::TimelineEventItemId; @@ -65,7 +65,10 @@ impl LocalEventTimelineItem { #[derive(Clone, Debug)] pub enum EventSendState { /// The local event has not been sent yet. - NotSentYet, + NotSentYet { + /// The progress of the sending operation, if any is available. + progress: Option, + }, /// The local event has been sent to the server, but unsuccessfully: The /// sending has failed. SendingFailed { @@ -84,3 +87,19 @@ pub enum EventSendState { event_id: OwnedEventId, }, } + +/// This type represents the "send progress" of a local event timeline item. +#[derive(Clone, Debug)] +pub enum EventSendProgress { + /// A media (consisting of a file and possibly a thumbnail) is being + /// uploaded. + MediaUpload { + /// The index of the media within the transaction. A file and its + /// thumbnail share the same index. + index: u64, + + /// The combined upload progress across the file and, if existing, its + /// thumbnail. + progress: AbstractProgress, + }, +} diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs index f484803b3bb..ed6b39da773 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs @@ -56,7 +56,7 @@ pub use self::{ PollResult, PollState, RoomMembershipChange, RoomPinnedEventsChange, Sticker, ThreadSummary, TimelineItemContent, }, - local::EventSendState, + local::{EventSendProgress, EventSendState}, }; /// An item in the timeline that represents at least one event. diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index aa259badc60..4ea50734ab6 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -91,11 +91,11 @@ pub use self::{ error::*, event_item::{ AnyOtherFullStateEventContent, EmbeddedEvent, EncryptedMessage, EventItemOrigin, - EventSendState, EventTimelineItem, InReplyToDetails, MemberProfileChange, MembershipChange, - Message, MsgLikeContent, MsgLikeKind, OtherState, PollResult, PollState, Profile, - ReactionInfo, ReactionStatus, ReactionsByKeyBySender, RoomMembershipChange, - RoomPinnedEventsChange, Sticker, ThreadSummary, TimelineDetails, TimelineEventItemId, - TimelineItemContent, + EventSendProgress, EventSendState, EventTimelineItem, InReplyToDetails, + MemberProfileChange, MembershipChange, Message, MsgLikeContent, MsgLikeKind, OtherState, + PollResult, PollState, Profile, ReactionInfo, ReactionStatus, ReactionsByKeyBySender, + RoomMembershipChange, RoomPinnedEventsChange, Sticker, ThreadSummary, TimelineDetails, + TimelineEventItemId, TimelineItemContent, }, event_type_filter::TimelineEventTypeFilter, item::{TimelineItem, TimelineItemKind, TimelineUniqueId}, diff --git a/crates/matrix-sdk-ui/src/timeline/tests/echo.rs b/crates/matrix-sdk-ui/src/timeline/tests/echo.rs index dd0fb7864f8..66a4ed05a8d 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/echo.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/echo.rs @@ -50,7 +50,10 @@ async fn test_remote_echo_full_trip() { let item = assert_next_matches!(stream, VectorDiff::PushBack { value } => value); let event_item = item.as_event().unwrap(); assert!(event_item.is_local_echo()); - assert_matches!(event_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!( + event_item.send_state(), + Some(EventSendState::NotSentYet { progress: None }) + ); assert!(!event_item.can_be_replied_to()); item.unique_id().to_owned() }; @@ -308,7 +311,7 @@ async fn test_no_reuse_of_counters() { let local_id = assert_next_matches_with_timeout!(stream, VectorDiff::PushBack { value: item } => { let event_item = item.as_event().unwrap(); assert!(event_item.is_local_echo()); - assert_matches!(event_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(event_item.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert!(!event_item.can_be_replied_to()); item.unique_id().to_owned() }); diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs index 56fb6eb7deb..35eea87c970 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs @@ -68,7 +68,7 @@ async fn test_echo() { assert_let!(VectorDiff::PushBack { value: local_echo } = &timeline_updates[0]); let item = local_echo.as_event().unwrap(); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert_let!(Some(msg) = item.content().as_message()); assert_let!(MessageType::Text(text) = msg.msgtype()); assert_eq!(text.body, "Hello, World!"); @@ -150,7 +150,7 @@ async fn test_retry_failed() { // First, local echo is added. assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { - assert_matches!(value.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(value.send_state(), Some(EventSendState::NotSentYet { progress: None })); }); // Sending fails, because the error is a transient one that's recoverable, @@ -216,7 +216,7 @@ async fn test_dedup_by_event_id_late() { // Timeline: [local echo] assert_let!(VectorDiff::PushBack { value: local_echo } = &timeline_updates[0]); let item = local_echo.as_event().unwrap(); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); // Timeline: [date-divider, local echo] assert_let!(VectorDiff::PushFront { value: date_divider } = &timeline_updates[1]); @@ -283,7 +283,7 @@ async fn test_cancel_failed() { // Local echo is added (immediately) assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { - assert_matches!(value.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(value.send_state(), Some(EventSendState::NotSentYet { progress: None })); }); // Sending fails, the mock server has no matching route diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/edit.rs b/crates/matrix-sdk-ui/tests/integration/timeline/edit.rs index b5767d4985e..5bbdc5f5406 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/edit.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/edit.rs @@ -199,7 +199,7 @@ async fn test_edit_local_echo() { let internal_id = item.unique_id(); let item = item.as_event().unwrap(); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert_let!(VectorDiff::PushFront { value: date_divider } = &timeline_updates[1]); assert!(date_divider.is_date_divider()); @@ -249,7 +249,7 @@ async fn test_edit_local_echo() { assert!(item.is_local_echo()); // The send state has been reset. - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let edit_message = item.content().as_message().unwrap(); assert_eq!(edit_message.body(), "hello, world"); @@ -635,7 +635,7 @@ async fn test_edit_local_echo_with_unsupported_content() { assert_let!(VectorDiff::PushBack { value: item } = &timeline_updates[0]); let item = item.as_event().unwrap(); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert_let!(VectorDiff::PushFront { value: date_divider } = &timeline_updates[1]); assert!(date_divider.is_date_divider()); @@ -689,7 +689,7 @@ async fn test_edit_local_echo_with_unsupported_content() { assert_let!(VectorDiff::PushBack { value: item } = &timeline_updates[0]); let item = item.as_event().unwrap(); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); // Let's edit the local echo (poll start) with an unsupported type (message). let edit_err = timeline diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/media.rs b/crates/matrix-sdk-ui/tests/integration/timeline/media.rs index 25d647e63ce..64ccbca7c50 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/media.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/media.rs @@ -25,9 +25,10 @@ use matrix_sdk::{ attachment::AttachmentConfig, room::reply::{EnforceThread, Reply}, test_utils::mocks::MatrixMockServer, + AbstractProgress, }; use matrix_sdk_test::{async_test, event_factory::EventFactory, JoinedRoomBuilder, ALICE}; -use matrix_sdk_ui::timeline::{AttachmentSource, EventSendState, RoomExt}; +use matrix_sdk_ui::timeline::{AttachmentSource, EventSendProgress, EventSendState, RoomExt}; #[cfg(feature = "unstable-msc4274")] use matrix_sdk_ui::timeline::{GalleryConfig, GalleryItemInfo}; #[cfg(feature = "unstable-msc4274")] @@ -124,7 +125,7 @@ async fn test_send_attachment_from_file() { { assert_let_timeout!(Some(VectorDiff::PushBack { value: item }) = timeline_stream.next()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert_let!(Some(msg) = item.content().as_message()); // Body is the caption, because there's both a caption and filename. @@ -141,6 +142,28 @@ async fn test_send_attachment_from_file() { assert!(aggregated.is_threaded()); } + // The media upload finishes. + { + assert_let_timeout!( + Duration::from_secs(3), + Some(VectorDiff::Set { index: 1, value: item }) = timeline_stream.next() + ); + assert_let!(Some(msg) = item.content().as_message()); + assert_let!( + Some(EventSendState::NotSentYet { + progress: Some(EventSendProgress::MediaUpload { index, progress }) + }) = item.send_state() + ); + assert_eq!(*index, 0); + assert_eq!(progress.current, progress.total); + assert_eq!(get_filename_and_caption(msg.msgtype()), ("test.bin", Some("caption"))); + + // The URI still refers to the local cache. + assert_let!(MessageType::File(file) = msg.msgtype()); + assert_let!(MediaSource::Plain(uri) = &file.source); + assert!(uri.to_string().contains("localhost")); + } + // Eventually, the media is updated with the final MXC IDs… { assert_let_timeout!( @@ -148,7 +171,7 @@ async fn test_send_attachment_from_file() { Some(VectorDiff::Set { index: 1, value: item }) = timeline_stream.next() ); assert_let!(Some(msg) = item.content().as_message()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert_eq!(get_filename_and_caption(msg.msgtype()), ("test.bin", Some("caption"))); // The URI now refers to the final MXC URI. @@ -174,7 +197,11 @@ async fn test_send_attachment_from_file() { #[async_test] async fn test_send_attachment_from_bytes() { let mock = MatrixMockServer::new().await; - let client = mock.client_builder().build().await; + let client = mock + .client_builder() + .with_enable_send_queue_media_upload_progress_reporting(true) + .build() + .await; mock.mock_authenticated_media_config().ok_default().mount().await; mock.mock_room_state_encryption().plain().mount().await; @@ -205,8 +232,9 @@ async fn test_send_attachment_from_bytes() { // The data of the file. let filename = "test.bin"; - let source = - AttachmentSource::Data { bytes: b"hello world".to_vec(), filename: filename.to_owned() }; + let bytes = b"hello world".to_vec(); + let size = bytes.len(); + let source = AttachmentSource::Data { bytes, filename: filename.to_owned() }; // Set up mocks for the file upload. mock.mock_upload() @@ -227,7 +255,7 @@ async fn test_send_attachment_from_bytes() { { assert_let_timeout!(Some(VectorDiff::PushBack { value: item }) = timeline_stream.next()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert_let!(Some(msg) = item.content().as_message()); // Body is the caption, because there's both a caption and filename. @@ -240,20 +268,53 @@ async fn test_send_attachment_from_bytes() { assert!(uri.to_string().contains("localhost")); } - // Eventually, the media is updated with the final MXC IDs… + // The media upload progress is being reported and eventually the upload + // finishes. { - assert_let_timeout!( - Duration::from_secs(3), - Some(VectorDiff::Set { index: 1, value: item }) = timeline_stream.next() - ); - assert_let!(Some(msg) = item.content().as_message()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); - assert_eq!(get_filename_and_caption(msg.msgtype()), (filename, Some("caption"))); - - // The URI now refers to the final MXC URI. - assert_let!(MessageType::File(file) = msg.msgtype()); - assert_let!(MediaSource::Plain(uri) = &file.source); - assert_eq!(uri.to_string(), "mxc://sdk.rs/media"); + let mut prev_progress: Option = None; + + loop { + assert_let_timeout!( + Duration::from_secs(3), + Some(VectorDiff::Set { index: 1, value: item }) = timeline_stream.next() + ); + + // The caption is still correct. + assert_let!(Some(msg) = item.content().as_message()); + assert_eq!(get_filename_and_caption(msg.msgtype()), (filename, Some("caption"))); + + assert_let!(Some(EventSendState::NotSentYet { progress }) = item.send_state()); + + match progress { + Some(EventSendProgress::MediaUpload { index, progress }) => { + // We're only uploading a single file. + assert_eq!(*index, 0); + + // The progress is reported in units of the unencrypted file size. + assert!(progress.current <= progress.total); + assert_eq!(progress.total, size); + + // The progress only increases. + if let Some(prev_progress) = prev_progress { + assert!(progress.current >= prev_progress.current); + } + prev_progress = Some(*progress); + + // The URI still refers to the local cache. + assert_let!(MessageType::File(file) = msg.msgtype()); + assert_let!(MediaSource::Plain(uri) = &file.source); + assert!(uri.to_string().contains("localhost")); + } + None => { + // The upload finished and the URI now refers to the final MXC URI. + assert_let!(MessageType::File(file) = msg.msgtype()); + assert_let!(MediaSource::Plain(uri) = &file.source); + assert_eq!(uri.to_string(), "mxc://sdk.rs/media"); + + break; + } + } + } } // And eventually the event itself is sent. @@ -341,7 +402,7 @@ async fn test_send_gallery_from_bytes() { { assert_let_timeout!(Some(VectorDiff::PushBack { value: item }) = timeline_stream.next()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert_let!(Some(msg) = item.content().as_message()); // Body matches gallery caption. @@ -361,6 +422,40 @@ async fn test_send_gallery_from_bytes() { assert!(uri.to_string().contains("localhost")); } + // The media upload finishes. + { + assert_let_timeout!( + Duration::from_secs(3), + Some(VectorDiff::Set { index: 1, value: item }) = timeline_stream.next() + ); + assert_let!( + Some(EventSendState::NotSentYet { + progress: Some(EventSendProgress::MediaUpload { index, progress }) + }) = item.send_state() + ); + assert_let!(Some(msg) = item.content().as_message()); + + // The upload has finished. + assert_eq!(*index, 0); + assert_eq!(progress.current, progress.total); + + // Body matches gallery caption. + assert_eq!(msg.body(), "caption"); + + // Message is gallery of expected length + assert_let!(MessageType::Gallery(content) = msg.msgtype()); + assert_eq!(1, content.itemtypes.len()); + assert_let!(GalleryItemType::File(file) = content.itemtypes.first().unwrap()); + + // Item has filename and caption + assert_eq!(filename, file.filename()); + assert_eq!(Some("item caption"), file.caption()); + + // The URI still refers to the local cache. + assert_let!(MediaSource::Plain(uri) = &file.source); + assert!(uri.to_string().contains("localhost")); + } + // Eventually, the media is updated with the final MXC IDs… { assert_let_timeout!( @@ -368,7 +463,7 @@ async fn test_send_gallery_from_bytes() { Some(VectorDiff::Set { index: 1, value: item }) = timeline_stream.next() ); assert_let!(Some(msg) = item.content().as_message()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); // Message is gallery of expected length assert_let!(MessageType::Gallery(content) = msg.msgtype()); diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs b/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs index 284a5f16327..e49355e5115 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs @@ -243,7 +243,7 @@ async fn test_redact_message() { assert_let!(VectorDiff::PushBack { value: second } = &timeline_updates[0]); let second = second.as_event().unwrap(); - assert_matches!(second.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(second.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert_let!(Some(timeline_updates) = timeline_stream.next().await); assert_eq!(timeline_updates.len(), 1); @@ -297,7 +297,7 @@ async fn test_redact_local_sent_message() { assert_let!(VectorDiff::PushBack { value: item } = &timeline_updates[0]); let event = item.as_event().unwrap(); assert!(event.is_local_echo()); - assert_matches!(event.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(event.send_state(), Some(EventSendState::NotSentYet { progress: None })); // As well as a date divider. assert_let!(VectorDiff::PushFront { value: date_divider } = &timeline_updates[1]); diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs index 1539f50fc9a..676b4c336bb 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs @@ -379,7 +379,10 @@ async fn test_clear_with_echoes() { // The message that failed to send. assert_matches!(event_items[1].send_state(), Some(EventSendState::SendingFailed { .. })); // The message that is still pending. - assert_matches!(event_items[2].send_state(), Some(EventSendState::NotSentYet)); + assert_matches!( + event_items[2].send_state(), + Some(EventSendState::NotSentYet { progress: None }) + ); // When we clear the timeline now, timeline.clear().await; @@ -390,7 +393,10 @@ async fn test_clear_with_echoes() { assert_eq!(event_items.len(), 2); assert_matches!(event_items[0].send_state(), Some(EventSendState::SendingFailed { .. })); - assert_matches!(event_items[1].send_state(), Some(EventSendState::NotSentYet)); + assert_matches!( + event_items[1].send_state(), + Some(EventSendState::NotSentYet { progress: None }) + ); } #[async_test] diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs b/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs index 78bbcd36e80..250b93a0ea4 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs @@ -329,7 +329,7 @@ async fn test_local_reaction_to_local_echo() { let item = item.as_event().unwrap(); assert!(item.is_local_echo()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert_eq!(item.content().as_message().unwrap().body(), "lol"); assert!(item.content().reactions().cloned().unwrap_or_default().is_empty()); @@ -355,7 +355,7 @@ async fn test_local_reaction_to_local_echo() { assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]); let item = item.as_event().unwrap(); assert!(item.is_local_echo()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let reactions = item.content().reactions().cloned().unwrap_or_default(); assert_eq!(reactions.len(), 1); @@ -377,7 +377,7 @@ async fn test_local_reaction_to_local_echo() { assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]); let item = item.as_event().unwrap(); assert!(item.is_local_echo()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let reactions = item.content().reactions().cloned().unwrap_or_default(); assert_eq!(reactions.len(), 2); @@ -398,7 +398,7 @@ async fn test_local_reaction_to_local_echo() { assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]); let item = item.as_event().unwrap(); assert!(item.is_local_echo()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let reactions = item.content().reactions().cloned().unwrap_or_default(); assert_eq!(reactions.len(), 1); diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/replies.rs b/crates/matrix-sdk-ui/tests/integration/timeline/replies.rs index 70ce43e91c3..7e5a56ced7a 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/replies.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/replies.rs @@ -719,7 +719,7 @@ async fn test_send_reply() { let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); - assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let msglike_reply_message = reply_item.content().as_msglike().unwrap(); let reply_message = reply_item.content().as_message().unwrap(); assert_eq!(reply_message.body(), "Replying to Bob"); @@ -821,7 +821,7 @@ async fn test_send_reply_to_self() { let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); - assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let msglike_reply_message = reply_item.content().as_msglike().unwrap(); let reply_message = msglike_reply_message.as_message().unwrap(); assert_eq!(reply_message.body(), "Replying to self"); @@ -886,7 +886,7 @@ async fn test_send_reply_to_threaded() { let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); - assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let msglike = reply_item.content().as_msglike().unwrap(); let reply_message = msglike.as_message().unwrap(); @@ -991,7 +991,7 @@ async fn test_send_reply_with_event_id() { let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); - assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let msglike_reply_message = reply_item.content().as_msglike().unwrap(); let reply_message = msglike_reply_message.as_message().unwrap(); assert_eq!(reply_message.body(), "Replying to Bob"); @@ -1079,7 +1079,7 @@ async fn test_send_reply_enforce_thread() { let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); - assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let msglike_reply_message = reply_item.content().as_msglike().unwrap(); let reply_message = msglike_reply_message.as_message().unwrap(); assert_eq!(reply_message.body(), "Replying to Bob"); @@ -1178,7 +1178,7 @@ async fn test_send_reply_enforce_thread_is_reply() { let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); - assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let msglike_reply_message = reply_item.content().as_msglike().unwrap(); let reply_message = msglike_reply_message.as_message().unwrap(); assert_eq!(reply_message.body(), "Replying to Bob"); @@ -1273,7 +1273,7 @@ async fn test_send_reply_with_event_id_that_is_redacted() { let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); - assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let msglike_reply_message = reply_item.content().as_msglike().unwrap(); let reply_message = msglike_reply_message.as_message().unwrap(); assert_eq!(reply_message.body(), "Replying to Bob"); diff --git a/testing/matrix-sdk-integration-testing/src/tests/timeline.rs b/testing/matrix-sdk-integration-testing/src/tests/timeline.rs index 24e55e6eb8f..57aae6f458c 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/timeline.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/timeline.rs @@ -282,7 +282,7 @@ async fn test_stale_local_echo_time_abort_edit() { } assert!(local_echo.is_editable()); - assert_matches!(local_echo.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(local_echo.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert_eq!(local_echo.content().as_message().unwrap().body(), "hi!"); let mut has_sender_profile = local_echo.sender_profile().is_ready(); From 5da557d7b5425a1950f02a89db41dcd95a11df6e Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Mon, 7 Jul 2025 19:58:39 +0200 Subject: [PATCH 10/30] feat(ffi): expose media upload progress through EventSendState::NotSentYet Signed-off-by: Johannes Marbach --- bindings/matrix-sdk-ffi/src/client.rs | 24 +++++++++++++ bindings/matrix-sdk-ffi/src/timeline/mod.rs | 40 ++++++++++++++++++--- 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/client.rs b/bindings/matrix-sdk-ffi/src/client.rs index d818e463d80..71063b8c05a 100644 --- a/bindings/matrix-sdk-ffi/src/client.rs +++ b/bindings/matrix-sdk-ffi/src/client.rs @@ -224,6 +224,30 @@ impl From for TransmissionProgress { } } +/// Progress of an operation in abstract units. +/// +/// Contrary to [`TransmissionProgress`], this allows tracking the progress +/// of sending or receiving a payload in estimated pseudo units representing a +/// percentage. This is helpful in cases where the exact progress in bytes isn't +/// known, for instance, because encryption (which changes the size) happens on +/// the fly. +#[derive(Clone, Copy, uniffi::Record)] +pub struct AbstractProgress { + /// How many units were already transferred. + pub current: u64, + /// How many units there are in total. + pub total: u64, +} + +impl From for AbstractProgress { + fn from(value: matrix_sdk::AbstractProgress) -> Self { + Self { + current: value.current.try_into().unwrap_or(u64::MAX), + total: value.total.try_into().unwrap_or(u64::MAX), + } + } +} + #[derive(uniffi::Object)] pub struct Client { pub(crate) inner: AsyncRuntimeDropped, diff --git a/bindings/matrix-sdk-ffi/src/timeline/mod.rs b/bindings/matrix-sdk-ffi/src/timeline/mod.rs index 641560f8999..88b55b37754 100644 --- a/bindings/matrix-sdk-ffi/src/timeline/mod.rs +++ b/bindings/matrix-sdk-ffi/src/timeline/mod.rs @@ -35,8 +35,8 @@ use matrix_sdk_common::{ stream::StreamExt, }; use matrix_sdk_ui::timeline::{ - self, AttachmentSource, EventItemOrigin, Profile, TimelineDetails, - TimelineUniqueId as SdkTimelineUniqueId, + self, AttachmentSource, EventItemOrigin, EventSendProgress as SdkEventSendProgress, Profile, + TimelineDetails, TimelineUniqueId as SdkTimelineUniqueId, }; use mime::Mime; use reply::{EmbeddedEventDetails, InReplyToDetails}; @@ -66,7 +66,7 @@ use uuid::Uuid; use self::content::TimelineItemContent; pub use self::msg_like::MessageContent; use crate::{ - client::ProgressWatcher, + client::{AbstractProgress, ProgressWatcher}, error::{ClientError, RoomError}, event::EventOrTransactionId, helpers::unwrap_or_clone_arc, @@ -270,6 +270,31 @@ impl TryInto for ReplyParameters { } } +/// This type represents the "send progress" of a local event timeline item. +#[derive(Clone, Copy, uniffi::Enum)] +pub enum EventSendProgress { + /// A media is being uploaded. + MediaUpload { + /// The index of the media within the transaction. A file and its + /// thumbnail share the same index. + index: u64, + + /// The current combined upload progress for both the file and, + /// if it exists, its thumbnail. + progress: AbstractProgress, + }, +} + +impl From for EventSendProgress { + fn from(value: SdkEventSendProgress) -> Self { + match value { + SdkEventSendProgress::MediaUpload { index, progress } => { + Self::MediaUpload { index, progress: progress.into() } + } + } + } +} + #[matrix_sdk_ffi_macros::export] impl Timeline { pub async fn add_listener(&self, listener: Box) -> Arc { @@ -1018,7 +1043,10 @@ impl TimelineItem { #[derive(Clone, uniffi::Enum)] pub enum EventSendState { /// The local event has not been sent yet. - NotSentYet, + NotSentYet { + /// The progress of the sending operation, if any is available. + progress: Option, + }, /// The local event has been sent to the server, but unsuccessfully: The /// sending has failed. @@ -1043,7 +1071,9 @@ impl From<&matrix_sdk_ui::timeline::EventSendState> for EventSendState { use matrix_sdk_ui::timeline::EventSendState::*; match value { - NotSentYet => Self::NotSentYet, + NotSentYet { progress } => { + Self::NotSentYet { progress: progress.clone().map(|p| p.into()) } + } SendingFailed { error, is_recoverable } => { let as_queue_wedge_error: matrix_sdk::QueueWedgeError = (&**error).into(); Self::SendingFailed { From 7ce947b126ca3c4b0806331af82b1dc0aacbd12d Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Wed, 9 Jul 2025 19:14:12 +0200 Subject: [PATCH 11/30] fixup! feat(send_queue): add global setting for sending media progress updates Control send queue progress reporting via the send queue rather than via the client --- bindings/matrix-sdk-ffi/src/client.rs | 6 ++++++ bindings/matrix-sdk-ffi/src/client_builder.rs | 19 +------------------ crates/matrix-sdk/src/client/builder/mod.rs | 17 +---------------- crates/matrix-sdk/src/send_queue/mod.rs | 9 +++++++-- 4 files changed, 15 insertions(+), 36 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/client.rs b/bindings/matrix-sdk-ffi/src/client.rs index 71063b8c05a..7910c32d24c 100644 --- a/bindings/matrix-sdk-ffi/src/client.rs +++ b/bindings/matrix-sdk-ffi/src/client.rs @@ -563,6 +563,12 @@ impl Client { self.inner.send_queue().set_enabled(enable).await; } + /// Enables or disables progress reporting for media uploads in the send + /// queue. + pub fn enable_send_queue_upload_progress(&self, enable: bool) { + self.inner.send_queue().enable_upload_progress(enable); + } + /// Subscribe to the global enablement status of the send queue, at the /// client-wide level. /// diff --git a/bindings/matrix-sdk-ffi/src/client_builder.rs b/bindings/matrix-sdk-ffi/src/client_builder.rs index a143f4d4fcf..2d814340a56 100644 --- a/bindings/matrix-sdk-ffi/src/client_builder.rs +++ b/bindings/matrix-sdk-ffi/src/client_builder.rs @@ -129,7 +129,6 @@ pub struct ClientBuilder { decryption_settings: DecryptionSettings, enable_share_history_on_invite: bool, request_config: Option, - enable_send_queue_media_upload_progress_reporting: bool, #[cfg(not(target_family = "wasm"))] user_agent: Option, @@ -181,7 +180,6 @@ impl ClientBuilder { enable_share_history_on_invite: false, request_config: Default::default(), threads_enabled: false, - enable_send_queue_media_upload_progress_reporting: false, }) } @@ -388,18 +386,6 @@ impl ClientBuilder { Arc::new(builder) } - /// Set whether to enable progress reporting for media uploads in the send - /// queue. - pub fn enable_send_queue_media_upload_progress_reporting( - self: Arc, - enable_send_queue_media_upload_progress_reporting: bool, - ) -> Arc { - let mut builder = unwrap_or_clone_arc(self); - builder.enable_send_queue_media_upload_progress_reporting = - enable_send_queue_media_upload_progress_reporting; - Arc::new(builder) - } - /// Add a default request config to this client. pub fn request_config(self: Arc, config: RequestConfig) -> Arc { let mut builder = unwrap_or_clone_arc(self); @@ -538,10 +524,7 @@ impl ClientBuilder { .with_encryption_settings(builder.encryption_settings) .with_room_key_recipient_strategy(builder.room_key_recipient_strategy) .with_decryption_settings(builder.decryption_settings) - .with_enable_share_history_on_invite(builder.enable_share_history_on_invite) - .with_enable_send_queue_media_upload_progress_reporting( - builder.enable_send_queue_media_upload_progress_reporting, - ); + .with_enable_share_history_on_invite(builder.enable_share_history_on_invite); match builder.sliding_sync_version_builder { SlidingSyncVersionBuilder::None => { diff --git a/crates/matrix-sdk/src/client/builder/mod.rs b/crates/matrix-sdk/src/client/builder/mod.rs index 250a3cc3480..bbaa84a95f1 100644 --- a/crates/matrix-sdk/src/client/builder/mod.rs +++ b/crates/matrix-sdk/src/client/builder/mod.rs @@ -114,7 +114,6 @@ pub struct ClientBuilder { enable_share_history_on_invite: bool, cross_process_store_locks_holder_name: String, threading_support: ThreadingSupport, - enable_send_queue_media_upload_progress_reporting: bool, } impl ClientBuilder { @@ -146,7 +145,6 @@ impl ClientBuilder { cross_process_store_locks_holder_name: Self::DEFAULT_CROSS_PROCESS_STORE_LOCKS_HOLDER_NAME.to_owned(), threading_support: ThreadingSupport::Disabled, - enable_send_queue_media_upload_progress_reporting: false, } } @@ -491,16 +489,6 @@ impl ClientBuilder { self } - /// Set whether to report media upload progress via the send queue. - pub fn with_enable_send_queue_media_upload_progress_reporting( - mut self, - enable_send_queue_media_upload_progress_reporting: bool, - ) -> Self { - self.enable_send_queue_media_upload_progress_reporting = - enable_send_queue_media_upload_progress_reporting; - self - } - /// Create a [`Client`] with the options set on this builder. /// /// # Errors @@ -585,10 +573,7 @@ impl ClientBuilder { }); // Enable the send queue by default. - let send_queue = Arc::new(SendQueueData::new( - true, - self.enable_send_queue_media_upload_progress_reporting, - )); + let send_queue = Arc::new(SendQueueData::new(true)); let server_info = ClientServerInfo { server_versions: match self.server_versions { diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index d70b5e112b8..17fe29ec4d1 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -285,6 +285,11 @@ impl SendQueue { self.data().globally_enabled.load(Ordering::SeqCst) } + /// Enable or disable progress reporting for media uploads. + pub fn enable_upload_progress(&self, enabled: bool) { + self.data().report_media_upload_progress.store(enabled, Ordering::SeqCst); + } + /// A subscriber to the enablement status (enabled or disabled) of the /// send queue, along with useful errors. pub fn subscribe_errors(&self) -> broadcast::Receiver { @@ -356,7 +361,7 @@ pub(super) struct SendQueueData { impl SendQueueData { /// Create the data for a send queue, in the given enabled state. - pub fn new(globally_enabled: bool, report_media_upload_progress: bool) -> Self { + pub fn new(globally_enabled: bool) -> Self { let (sender, _) = broadcast::channel(32); Self { @@ -364,7 +369,7 @@ impl SendQueueData { globally_enabled: AtomicBool::new(globally_enabled), error_reporter: sender, is_dropping: Arc::new(false.into()), - report_media_upload_progress: Arc::new(AtomicBool::new(report_media_upload_progress)), + report_media_upload_progress: Arc::new(false.into()), } } } From 80be2d2aaa79c6e271304120dfaefc6fcc8325ce Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Wed, 9 Jul 2025 19:14:14 +0200 Subject: [PATCH 12/30] fixup! feat(send_queue): communicate media upload progress in RoomSendQueueUpdate::MediaUpload Enable upload progress via the send queue rather than via the client builder --- crates/matrix-sdk/src/test_utils/client.rs | 11 ----------- crates/matrix-sdk/tests/integration/send_queue.rs | 7 ++----- 2 files changed, 2 insertions(+), 16 deletions(-) diff --git a/crates/matrix-sdk/src/test_utils/client.rs b/crates/matrix-sdk/src/test_utils/client.rs index 586ab10bbca..e8fffaced2c 100644 --- a/crates/matrix-sdk/src/test_utils/client.rs +++ b/crates/matrix-sdk/src/test_utils/client.rs @@ -131,17 +131,6 @@ impl MockClientBuilder { self } - /// Set whether to report media upload progress via the send queue. - pub fn with_enable_send_queue_media_upload_progress_reporting( - mut self, - enable_send_queue_media_upload_progress_reporting: bool, - ) -> Self { - self.builder = self.builder.with_enable_send_queue_media_upload_progress_reporting( - enable_send_queue_media_upload_progress_reporting, - ); - self - } - /// Finish building the client into the final [`Client`] instance. pub async fn build(self) -> Client { let mut builder = self.builder; diff --git a/crates/matrix-sdk/tests/integration/send_queue.rs b/crates/matrix-sdk/tests/integration/send_queue.rs index f54bb059a8e..c97aff02b3a 100644 --- a/crates/matrix-sdk/tests/integration/send_queue.rs +++ b/crates/matrix-sdk/tests/integration/send_queue.rs @@ -1827,11 +1827,8 @@ async fn test_media_uploads() { // Mark the room as joined. let room_id = room_id!("!a:b.c"); - let client = mock - .client_builder() - .with_enable_send_queue_media_upload_progress_reporting(true) - .build() - .await; + let client = mock.client_builder().build().await; + client.send_queue().enable_upload_progress(true); let room = mock.sync_joined_room(&client, room_id).await; let q = room.send_queue(); From f91be756d600950f654377ae05e155573a8718b5 Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Wed, 9 Jul 2025 19:14:15 +0200 Subject: [PATCH 13/30] fixup! feat(timeline): communicate media upload progress through EventSendState::NotSentYet Enable upload progress via the send queue rather than via the client builder --- crates/matrix-sdk-ui/tests/integration/timeline/media.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/media.rs b/crates/matrix-sdk-ui/tests/integration/timeline/media.rs index 64ccbca7c50..1a9cba61e12 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/media.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/media.rs @@ -197,11 +197,8 @@ async fn test_send_attachment_from_file() { #[async_test] async fn test_send_attachment_from_bytes() { let mock = MatrixMockServer::new().await; - let client = mock - .client_builder() - .with_enable_send_queue_media_upload_progress_reporting(true) - .build() - .await; + let client = mock.client_builder().build().await; + client.send_queue().enable_upload_progress(true); mock.mock_authenticated_media_config().ok_default().mount().await; mock.mock_room_state_encryption().plain().mount().await; From 0f2eb69d3a4e144ac675c12a21c83a75b1a862d0 Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Wed, 9 Jul 2025 19:14:15 +0200 Subject: [PATCH 14/30] fixup! feat(send_queue): enable progress monitoring in RoomSendQueue::handle_request Keep send_progress private --- crates/matrix-sdk/src/encryption/futures.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/encryption/futures.rs b/crates/matrix-sdk/src/encryption/futures.rs index 8db56bef539..a487ff8ed21 100644 --- a/crates/matrix-sdk/src/encryption/futures.rs +++ b/crates/matrix-sdk/src/encryption/futures.rs @@ -30,7 +30,7 @@ use crate::{config::RequestConfig, Client, Media, Result, TransmissionProgress}; pub struct UploadEncryptedFile<'a, R: ?Sized> { client: &'a Client, reader: &'a mut R, - pub(crate) send_progress: SharedObservable, + send_progress: SharedObservable, request_config: Option, } From 1d1e9a6451c96f9906eef3c262d65bb4396cf906 Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Wed, 9 Jul 2025 19:21:12 +0200 Subject: [PATCH 15/30] fixup! feat(send_queue): communicate media upload progress in RoomSendQueueUpdate::MediaUpload Correct comments of assert_update macros --- crates/matrix-sdk/tests/integration/send_queue.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/matrix-sdk/tests/integration/send_queue.rs b/crates/matrix-sdk/tests/integration/send_queue.rs index c97aff02b3a..b101537b9d8 100644 --- a/crates/matrix-sdk/tests/integration/send_queue.rs +++ b/crates/matrix-sdk/tests/integration/send_queue.rs @@ -161,7 +161,6 @@ macro_rules! assert_update { }}; // Check the next stream event is a notification about an uploaded media. - // Returns a tuple of (transaction_id, send_handle). ($watch:ident => uploaded { related_to = $related_to:expr, mxc = $mxc:expr }) => {{ assert_let!( Ok(Ok(RoomSendQueueUpdate::MediaUpload { @@ -177,7 +176,6 @@ macro_rules! assert_update { }}; // Check the next stream events communicate upload progress and finally the uploaded media. - // Returns a tuple of (transaction_id, send_handle). ($watch:ident => uploaded_with_progress { related_to = $related_to:expr, mxc = $mxc:expr, progress_start = $progress_start:expr, progress_end = $progress_end:expr, progress_total = $progress_total:expr }) => {{ let mut prev_progress: Option = None; From 08a805d06f169f2d1b10b453a9e7ace1318e6f1a Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Wed, 9 Jul 2025 19:27:18 +0200 Subject: [PATCH 16/30] fixup! feat(send_queue): communicate media upload progress in RoomSendQueueUpdate::MediaUpload Split fields of uploaded_with_progress across several lines --- crates/matrix-sdk/tests/integration/send_queue.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/matrix-sdk/tests/integration/send_queue.rs b/crates/matrix-sdk/tests/integration/send_queue.rs index b101537b9d8..7a0bc1141aa 100644 --- a/crates/matrix-sdk/tests/integration/send_queue.rs +++ b/crates/matrix-sdk/tests/integration/send_queue.rs @@ -176,7 +176,13 @@ macro_rules! assert_update { }}; // Check the next stream events communicate upload progress and finally the uploaded media. - ($watch:ident => uploaded_with_progress { related_to = $related_to:expr, mxc = $mxc:expr, progress_start = $progress_start:expr, progress_end = $progress_end:expr, progress_total = $progress_total:expr }) => {{ + ($watch:ident => uploaded_with_progress { + related_to = $related_to:expr, + mxc = $mxc:expr, + progress_start = $progress_start:expr, + progress_end = $progress_end:expr, + progress_total = $progress_total:expr + }) => {{ let mut prev_progress: Option = None; loop { From 66827e18e5fab038175f0a0695bd528703708503 Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Wed, 9 Jul 2025 19:33:45 +0200 Subject: [PATCH 17/30] fixup! feat(send_queue): communicate media upload progress in RoomSendQueueUpdate::MediaUpload Clarify that index will always be zero outside of galleries --- crates/matrix-sdk/src/send_queue/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index 17fe29ec4d1..fe9d29fe6e3 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -2400,7 +2400,8 @@ pub enum RoomSendQueueUpdate { file: Option, /// The index of the media within the transaction. A file and its - /// thumbnail share the same index. + /// thumbnail share the same index. Will always be 0 for non-gallery + /// media uploads. index: u64, /// The combined upload progress across the file and, if existing, its From 5cb65e1b7ea4812ade0b91adc77a8de29e7979c2 Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Wed, 9 Jul 2025 19:34:12 +0200 Subject: [PATCH 18/30] fixup! feat(timeline): communicate media upload progress through EventSendState::NotSentYet Clarify that index will always be zero outside of galleries --- crates/matrix-sdk-ui/src/timeline/event_item/local.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/local.rs b/crates/matrix-sdk-ui/src/timeline/event_item/local.rs index 8aa3f9ec8e0..ab3bf4dde1f 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/local.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/local.rs @@ -95,7 +95,8 @@ pub enum EventSendProgress { /// uploaded. MediaUpload { /// The index of the media within the transaction. A file and its - /// thumbnail share the same index. + /// thumbnail share the same index. Will always be 0 for non-gallery + /// media uploads. index: u64, /// The combined upload progress across the file and, if existing, its From 8ba3aa62f2891a6bb2f7dda5cfb3c5e817bedf46 Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Wed, 9 Jul 2025 19:34:43 +0200 Subject: [PATCH 19/30] fixup! feat(ffi): expose media upload progress through EventSendState::NotSentYet Clarify that index will always be zero outside of galleries --- bindings/matrix-sdk-ffi/src/timeline/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bindings/matrix-sdk-ffi/src/timeline/mod.rs b/bindings/matrix-sdk-ffi/src/timeline/mod.rs index 88b55b37754..f1c1eb63fcb 100644 --- a/bindings/matrix-sdk-ffi/src/timeline/mod.rs +++ b/bindings/matrix-sdk-ffi/src/timeline/mod.rs @@ -276,7 +276,8 @@ pub enum EventSendProgress { /// A media is being uploaded. MediaUpload { /// The index of the media within the transaction. A file and its - /// thumbnail share the same index. + /// thumbnail share the same index. Will always be 0 for non-gallery + /// media uploads. index: u64, /// The current combined upload progress for both the file and, From 207fc5fb794f91aa2986368e4952cec612f301df Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Wed, 9 Jul 2025 19:39:25 +0200 Subject: [PATCH 20/30] fixup! feat(send_queue): communicate media upload progress in RoomSendQueueUpdate::MediaUpload Also assert the index when asserting upload progress --- crates/matrix-sdk/tests/integration/send_queue.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/matrix-sdk/tests/integration/send_queue.rs b/crates/matrix-sdk/tests/integration/send_queue.rs index 7a0bc1141aa..acc9fdef230 100644 --- a/crates/matrix-sdk/tests/integration/send_queue.rs +++ b/crates/matrix-sdk/tests/integration/send_queue.rs @@ -179,6 +179,7 @@ macro_rules! assert_update { ($watch:ident => uploaded_with_progress { related_to = $related_to:expr, mxc = $mxc:expr, + index = $index:expr, progress_start = $progress_start:expr, progress_end = $progress_end:expr, progress_total = $progress_total:expr @@ -190,11 +191,13 @@ macro_rules! assert_update { Ok(Ok(RoomSendQueueUpdate::MediaUpload { related_to, file, + index, progress, .. })) = timeout(Duration::from_secs(1), $watch.recv()).await ); assert_eq!(related_to, $related_to); + assert_eq!(index, $index); if let Some(progress_start) = $progress_start { assert!(progress.current >= progress_start); @@ -2028,6 +2031,7 @@ async fn test_media_uploads() { assert_update!(watch => uploaded_with_progress { related_to = transaction_id, mxc = mxc_uri!("mxc://sdk.rs/thumbnail"), + index = 0, progress_start = None, progress_end = size_thumbnail, progress_total = size_data + size_thumbnail @@ -2036,6 +2040,7 @@ async fn test_media_uploads() { assert_update!(watch => uploaded_with_progress { related_to = transaction_id, mxc = mxc_uri!("mxc://sdk.rs/media"), + index = 0, progress_start = Some(size_thumbnail), progress_end = size_data + size_thumbnail, progress_total = size_data + size_thumbnail From 50ed24c392391cb3da65d7c2f86274c6de9f06e7 Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Wed, 9 Jul 2025 19:43:06 +0200 Subject: [PATCH 21/30] fixup! feat(send_queue): communicate media upload progress in RoomSendQueueUpdate::MediaUpload Clarify what progress means for gallery uploads --- crates/matrix-sdk/src/send_queue/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index fe9d29fe6e3..c861dfac1b8 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -2405,7 +2405,8 @@ pub enum RoomSendQueueUpdate { index: u64, /// The combined upload progress across the file and, if existing, its - /// thumbnail. + /// thumbnail. For gallery uploads, the progress is reported per indexed + /// gallery item. progress: AbstractProgress, }, } From c7d1934b9922899ff86ad1a24c6bba38cce24314 Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Wed, 9 Jul 2025 19:43:33 +0200 Subject: [PATCH 22/30] fixup! feat(timeline): communicate media upload progress through EventSendState::NotSentYet Clarify what progress means for gallery uploads --- crates/matrix-sdk-ui/src/timeline/event_item/local.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/local.rs b/crates/matrix-sdk-ui/src/timeline/event_item/local.rs index ab3bf4dde1f..5d3e3fc398d 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/local.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/local.rs @@ -100,7 +100,8 @@ pub enum EventSendProgress { index: u64, /// The combined upload progress across the file and, if existing, its - /// thumbnail. + /// thumbnail. For gallery uploads, the progress is reported per indexed + /// gallery item. progress: AbstractProgress, }, } From 48ccad930dbc00e34458ba596eda74d3c18f0f3e Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Wed, 9 Jul 2025 19:48:33 +0200 Subject: [PATCH 23/30] fixup! feat(send_queue): communicate media upload progress in RoomSendQueueUpdate::MediaUpload Rename uploaded and pending for better clarity --- crates/matrix-sdk/src/send_queue/mod.rs | 43 ++++++++++++++----------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index c861dfac1b8..c922c18fa92 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -648,8 +648,8 @@ impl RoomSendQueue { current: media_upload_info.bytes, total: media_upload_info.bytes, }, - media_upload_info.uploaded, - media_upload_info.pending, + media_upload_info.uploaded_thumbnail_bytes, + media_upload_info.pending_file_bytes, ), }); } @@ -773,7 +773,7 @@ impl RoomSendQueue { // If this is a file upload, get the size of any previously uploaded thumbnail // from the in-memory media sizes cache. - let uploaded = if thumbnail_source.is_some() { + let uploaded_thumbnail_bytes = if thumbnail_source.is_some() { if let Some(sizes) = queue.thumbnail_size_cache.lock().await.get(related_to) { sizes.get(index).copied().flatten().unwrap_or(0) } else { @@ -785,14 +785,19 @@ impl RoomSendQueue { // If this is a thumbnail upload, get the size of the pending file upload from // the dependent requests. - let pending = RoomSendQueue::get_dependent_pending_file_upload_size( + let pending_file_bytes = RoomSendQueue::get_dependent_pending_file_upload_size( queued_request.transaction_id.clone(), queue, room, ) .await; - Some(MediaUploadInfo { index: index as u64, bytes, uploaded, pending }) + Some(MediaUploadInfo { + index: index as u64, + bytes, + uploaded_thumbnail_bytes, + pending_file_bytes, + }) } /// Determine the size of a pending file upload, if this is a thumbnail @@ -872,8 +877,8 @@ impl RoomSendQueue { index: media_upload_info.index, progress: estimate_combined_media_upload_progress( estimate_media_upload_progress(progress, media_upload_info.bytes), - media_upload_info.uploaded, - media_upload_info.pending, + media_upload_info.uploaded_thumbnail_bytes, + media_upload_info.pending_file_bytes, ), }); } @@ -1077,21 +1082,21 @@ fn estimate_media_upload_progress( /// * `progress` - The progress of uploading the current file mapped into units /// of the original file size before encryption. /// -/// * `uploaded` - If this is a media file upload and an associated thumbnail -/// was previously uploaded, the number of bytes in the thumbnail before -/// encryption. Otherwise, zero. +/// * `uploaded_thumbnail_bytes` - If this is a media file upload and an +/// associated thumbnail was previously uploaded, the number of bytes in the +/// thumbnail before encryption. Otherwise, zero. /// -/// * `pending` - If this is a thumbnail upload, the number of bytes in the -/// still to be uploaded associated media file before encryption. Otherwise, -/// zero. +/// * `pending_file_bytes` - If this is a thumbnail upload, the number of bytes +/// in the still to be uploaded associated media file before encryption. +/// Otherwise, zero. fn estimate_combined_media_upload_progress( progress: AbstractProgress, - uploaded: usize, - pending: usize, + uploaded_thumbnail_bytes: usize, + pending_file_bytes: usize, ) -> AbstractProgress { AbstractProgress { - current: uploaded + progress.current, - total: uploaded + progress.total + pending, + current: uploaded_thumbnail_bytes + progress.current, + total: uploaded_thumbnail_bytes + progress.total + pending_file_bytes, } } @@ -1176,10 +1181,10 @@ struct MediaUploadInfo { bytes: usize, /// If the current file is not a thumbnail, the total number of bytes in a /// previously uploaded thumbnail, if any exists. Otherwise, zero. - uploaded: usize, + uploaded_thumbnail_bytes: usize, /// If the current file is a thumbnail, the total number of bytes in the /// related media file still to be uploaded. Otherwise, zero. - pending: usize, + pending_file_bytes: usize, } impl BeingSentInfo { From 13631d545bf0165fa384bda5c32f8024310d6a47 Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Wed, 9 Jul 2025 19:49:57 +0200 Subject: [PATCH 24/30] fixup! feat(send_queue): communicate media upload progress in RoomSendQueueUpdate::MediaUpload Fix typo --- crates/matrix-sdk/src/send_queue/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index c922c18fa92..45defb25455 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -845,7 +845,7 @@ impl RoomSendQueue { } } - /// Try to creates an observable to watch a media's upload progress. + /// Try to create an observable to watch a media's upload progress. fn try_create_media_upload_progress_observable( report_media_upload_progress: &Arc, media_upload_info: &MediaUploadInfo, From 50696e1fe299da30fc591b10cc71cb02178f6749 Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Wed, 9 Jul 2025 19:52:59 +0200 Subject: [PATCH 25/30] fixup! feat(send_queue): communicate media upload progress in RoomSendQueueUpdate::MediaUpload Only check if upload progress should be reported before starting uploads --- crates/matrix-sdk/src/send_queue/mod.rs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index 45defb25455..edd04cf8712 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -860,7 +860,6 @@ impl RoomSendQueue { let progress: SharedObservable = Default::default(); let mut subscriber = progress.subscribe(); - let report_media_upload_progress = Arc::clone(report_media_upload_progress); let media_upload_info = media_upload_info.clone(); let related_to = related_txn_id.clone(); let updates = updates.clone(); @@ -870,18 +869,16 @@ impl RoomSendQueue { // task will end. spawn(async move { while let Some(progress) = subscriber.next().await { - if report_media_upload_progress.load(Ordering::SeqCst) { - let _ = updates.send(RoomSendQueueUpdate::MediaUpload { - related_to: related_to.clone(), - file: None, - index: media_upload_info.index, - progress: estimate_combined_media_upload_progress( - estimate_media_upload_progress(progress, media_upload_info.bytes), - media_upload_info.uploaded_thumbnail_bytes, - media_upload_info.pending_file_bytes, - ), - }); - } + let _ = updates.send(RoomSendQueueUpdate::MediaUpload { + related_to: related_to.clone(), + file: None, + index: media_upload_info.index, + progress: estimate_combined_media_upload_progress( + estimate_media_upload_progress(progress, media_upload_info.bytes), + media_upload_info.uploaded_thumbnail_bytes, + media_upload_info.pending_file_bytes, + ), + }); } }); From 0e1baace4ac32457b8a67bbf0f9182661264fd58 Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Wed, 9 Jul 2025 19:59:54 +0200 Subject: [PATCH 26/30] fixup! feat(send_queue): communicate media upload progress in RoomSendQueueUpdate::MediaUpload Reuse MediaUploadInfo in function parameters --- crates/matrix-sdk/src/send_queue/mod.rs | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index edd04cf8712..438db38b62d 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -648,8 +648,7 @@ impl RoomSendQueue { current: media_upload_info.bytes, total: media_upload_info.bytes, }, - media_upload_info.uploaded_thumbnail_bytes, - media_upload_info.pending_file_bytes, + &media_upload_info, ), }); } @@ -875,8 +874,7 @@ impl RoomSendQueue { index: media_upload_info.index, progress: estimate_combined_media_upload_progress( estimate_media_upload_progress(progress, media_upload_info.bytes), - media_upload_info.uploaded_thumbnail_bytes, - media_upload_info.pending_file_bytes, + &media_upload_info, ), }); } @@ -1079,21 +1077,14 @@ fn estimate_media_upload_progress( /// * `progress` - The progress of uploading the current file mapped into units /// of the original file size before encryption. /// -/// * `uploaded_thumbnail_bytes` - If this is a media file upload and an -/// associated thumbnail was previously uploaded, the number of bytes in the -/// thumbnail before encryption. Otherwise, zero. -/// -/// * `pending_file_bytes` - If this is a thumbnail upload, the number of bytes -/// in the still to be uploaded associated media file before encryption. -/// Otherwise, zero. +/// * `info` - Information about the file(s) being uploaded. fn estimate_combined_media_upload_progress( progress: AbstractProgress, - uploaded_thumbnail_bytes: usize, - pending_file_bytes: usize, + info: &MediaUploadInfo, ) -> AbstractProgress { AbstractProgress { - current: uploaded_thumbnail_bytes + progress.current, - total: uploaded_thumbnail_bytes + progress.total + pending_file_bytes, + current: info.uploaded_thumbnail_bytes + progress.current, + total: info.uploaded_thumbnail_bytes + progress.total + info.pending_file_bytes, } } From cf51ac9bd33167be0512476eab5ea135d7d62c98 Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Wed, 9 Jul 2025 20:09:20 +0200 Subject: [PATCH 27/30] fixup! feat(send_queue): cache thumbnail sizes to use them in progress reporting later Expand doc comment for thumbnail_size_cache --- crates/matrix-sdk/src/send_queue/mod.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index 438db38b62d..aef4c89fb1e 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -1243,6 +1243,16 @@ struct QueueStorage { /// In-memory mapping of media transaction IDs to thumbnail sizes for the /// purpose of progress reporting. + /// + /// The keys are the transaction IDs for sending the media or gallery event + /// after all uploads have finished. This allows us to easily clean up the + /// cache after the event was sent. + /// + /// For media uploads, the value vector will always have a single element. + /// + /// For galleries, some gallery items might not have a thumbnail while + /// others do. Since we access the thumbnails by their index within the + /// gallery, the vector needs to hold optional usize's. thumbnail_size_cache: Arc>>>>, } From 97a553f39bb2ede6451724c716a832e446d6211a Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Wed, 9 Jul 2025 20:17:05 +0200 Subject: [PATCH 28/30] fixup! feat(send_queue): cache thumbnail sizes to use them in progress reporting later Switch to sync mutex --- crates/matrix-sdk/src/send_queue/mod.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index aef4c89fb1e..2bf403e6a48 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -152,7 +152,10 @@ use matrix_sdk_base::{ store_locks::LockStoreError, RoomState, StoreError, }; -use matrix_sdk_common::executor::{spawn, JoinHandle}; +use matrix_sdk_common::{ + executor::{spawn, JoinHandle}, + locks::Mutex as SyncMutex, +}; use mime::Mime; use ruma::{ events::{ @@ -773,7 +776,7 @@ impl RoomSendQueue { // If this is a file upload, get the size of any previously uploaded thumbnail // from the in-memory media sizes cache. let uploaded_thumbnail_bytes = if thumbnail_source.is_some() { - if let Some(sizes) = queue.thumbnail_size_cache.lock().await.get(related_to) { + if let Some(sizes) = queue.thumbnail_size_cache.lock().get(related_to) { sizes.get(index).copied().flatten().unwrap_or(0) } else { 0 @@ -1253,7 +1256,7 @@ struct QueueStorage { /// For galleries, some gallery items might not have a thumbnail while /// others do. Since we access the thumbnails by their index within the /// gallery, the vector needs to hold optional usize's. - thumbnail_size_cache: Arc>>>>, + thumbnail_size_cache: Arc>>>>, } impl QueueStorage { @@ -1268,7 +1271,7 @@ impl QueueStorage { Self { room_id: room, store: StoreLock { client, being_sent: Default::default() }, - thumbnail_size_cache: Arc::new(Mutex::new(HashMap::new())), + thumbnail_size_cache: Arc::new(SyncMutex::new(HashMap::new())), } } @@ -1426,7 +1429,7 @@ impl QueueStorage { warn!(txn_id = %transaction_id, "request marked as sent was missing from storage"); } - self.thumbnail_size_cache.lock().await.remove(transaction_id); + self.thumbnail_size_cache.lock().remove(transaction_id); Ok(()) } @@ -1468,7 +1471,7 @@ impl QueueStorage { .remove_send_queue_request(&self.room_id, transaction_id) .await?; - self.thumbnail_size_cache.lock().await.remove(transaction_id); + self.thumbnail_size_cache.lock().remove(transaction_id); Ok(removed) } @@ -1561,7 +1564,7 @@ impl QueueStorage { ) .await?; - self.thumbnail_size_cache.lock().await.insert(send_event_txn, media_sizes); + self.thumbnail_size_cache.lock().insert(send_event_txn, media_sizes); Ok(()) } @@ -1691,7 +1694,7 @@ impl QueueStorage { ) .await?; - self.thumbnail_size_cache.lock().await.insert(send_event_txn, media_sizes); + self.thumbnail_size_cache.lock().insert(send_event_txn, media_sizes); Ok(()) } From 971dcdafb5586dd001906444cfe529ce7415db6c Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Thu, 10 Jul 2025 15:05:30 +0200 Subject: [PATCH 29/30] fixup! feat(send_queue): cache thumbnail sizes to use them in progress reporting later Move thumbnail size cache into StoreLock --- crates/matrix-sdk/src/send_queue/mod.rs | 59 ++++++++++++++----------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index 2bf403e6a48..0eaca99a904 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -152,10 +152,7 @@ use matrix_sdk_base::{ store_locks::LockStoreError, RoomState, StoreError, }; -use matrix_sdk_common::{ - executor::{spawn, JoinHandle}, - locks::Mutex as SyncMutex, -}; +use matrix_sdk_common::executor::{spawn, JoinHandle}; use mime::Mime; use ruma::{ events::{ @@ -1205,6 +1202,20 @@ struct StoreLock { /// /// Also used as the lock to access the state store. being_sent: Arc>>, + + /// In-memory mapping of media transaction IDs to thumbnail sizes for the + /// purpose of progress reporting. + /// + /// The keys are the transaction IDs for sending the media or gallery event + /// after all uploads have finished. This allows us to easily clean up the + /// cache after the event was sent. + /// + /// For media uploads, the value vector will always have a single element. + /// + /// For galleries, some gallery items might not have a thumbnail while + /// others do. Since we access the thumbnails by their index within the + /// gallery, the vector needs to hold optional usize's. + thumbnail_size_cache: Arc>>>>, } impl StoreLock { @@ -1213,6 +1224,7 @@ impl StoreLock { StoreLockGuard { client: self.client.clone(), being_sent: self.being_sent.clone().lock_owned().await, + thumbnail_size_cache: self.thumbnail_size_cache.clone().lock_owned().await, } } } @@ -1226,6 +1238,10 @@ struct StoreLockGuard { /// The one queued request that is being sent at the moment, along with /// associated data that can be useful to act upon it. being_sent: OwnedMutexGuard>, + + /// In-memory mapping of media transaction IDs to thumbnail sizes for the + /// purpose of progress reporting. + thumbnail_size_cache: OwnedMutexGuard>>>, } impl StoreLockGuard { @@ -1243,20 +1259,6 @@ struct QueueStorage { /// To which room is this storage related. room_id: OwnedRoomId, - - /// In-memory mapping of media transaction IDs to thumbnail sizes for the - /// purpose of progress reporting. - /// - /// The keys are the transaction IDs for sending the media or gallery event - /// after all uploads have finished. This allows us to easily clean up the - /// cache after the event was sent. - /// - /// For media uploads, the value vector will always have a single element. - /// - /// For galleries, some gallery items might not have a thumbnail while - /// others do. Since we access the thumbnails by their index within the - /// gallery, the vector needs to hold optional usize's. - thumbnail_size_cache: Arc>>>>, } impl QueueStorage { @@ -1270,8 +1272,11 @@ impl QueueStorage { fn new(client: WeakClient, room: OwnedRoomId) -> Self { Self { room_id: room, - store: StoreLock { client, being_sent: Default::default() }, - thumbnail_size_cache: Arc::new(SyncMutex::new(HashMap::new())), + store: StoreLock { + client, + being_sent: Default::default(), + thumbnail_size_cache: Default::default(), + }, } } @@ -1429,7 +1434,7 @@ impl QueueStorage { warn!(txn_id = %transaction_id, "request marked as sent was missing from storage"); } - self.thumbnail_size_cache.lock().remove(transaction_id); + guard.thumbnail_size_cache.remove(transaction_id); Ok(()) } @@ -1444,7 +1449,7 @@ impl QueueStorage { &self, transaction_id: &TransactionId, ) -> Result { - let guard = self.store.lock().await; + let mut guard = self.store.lock().await; if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref()) == Some(transaction_id) @@ -1471,7 +1476,7 @@ impl QueueStorage { .remove_send_queue_request(&self.room_id, transaction_id) .await?; - self.thumbnail_size_cache.lock().remove(transaction_id); + guard.thumbnail_size_cache.remove(transaction_id); Ok(removed) } @@ -1531,7 +1536,7 @@ impl QueueStorage { file_media_request: MediaRequestParameters, thumbnail: Option, ) -> Result<(), RoomSendQueueStorageError> { - let guard = self.store.lock().await; + let mut guard = self.store.lock().await; let client = guard.client()?; let store = client.state_store(); @@ -1564,7 +1569,7 @@ impl QueueStorage { ) .await?; - self.thumbnail_size_cache.lock().insert(send_event_txn, media_sizes); + guard.thumbnail_size_cache.insert(send_event_txn, media_sizes); Ok(()) } @@ -1581,7 +1586,7 @@ impl QueueStorage { created_at: MilliSecondsSinceUnixEpoch, item_queue_infos: Vec, ) -> Result<(), RoomSendQueueStorageError> { - let guard = self.store.lock().await; + let mut guard = self.store.lock().await; let client = guard.client()?; let store = client.state_store(); @@ -1694,7 +1699,7 @@ impl QueueStorage { ) .await?; - self.thumbnail_size_cache.lock().insert(send_event_txn, media_sizes); + guard.thumbnail_size_cache.insert(send_event_txn, media_sizes); Ok(()) } From ad963c786d8ac590296d9711b7719d7f506cabc0 Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Thu, 10 Jul 2025 15:06:14 +0200 Subject: [PATCH 30/30] fixup! feat(send_queue): communicate media upload progress in RoomSendQueueUpdate::MediaUpload Read cached thumbnail size from StoreLock --- crates/matrix-sdk/src/send_queue/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index 0eaca99a904..939764c4de2 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -773,7 +773,7 @@ impl RoomSendQueue { // If this is a file upload, get the size of any previously uploaded thumbnail // from the in-memory media sizes cache. let uploaded_thumbnail_bytes = if thumbnail_source.is_some() { - if let Some(sizes) = queue.thumbnail_size_cache.lock().get(related_to) { + if let Some(sizes) = queue.store.lock().await.thumbnail_size_cache.get(related_to) { sizes.get(index).copied().flatten().unwrap_or(0) } else { 0