diff --git a/bindings/matrix-sdk-ffi/src/client.rs b/bindings/matrix-sdk-ffi/src/client.rs index d818e463d80..7910c32d24c 100644 --- a/bindings/matrix-sdk-ffi/src/client.rs +++ b/bindings/matrix-sdk-ffi/src/client.rs @@ -224,6 +224,30 @@ impl From for TransmissionProgress { } } +/// Progress of an operation in abstract units. +/// +/// Contrary to [`TransmissionProgress`], this allows tracking the progress +/// of sending or receiving a payload in estimated pseudo units representing a +/// percentage. This is helpful in cases where the exact progress in bytes isn't +/// known, for instance, because encryption (which changes the size) happens on +/// the fly. +#[derive(Clone, Copy, uniffi::Record)] +pub struct AbstractProgress { + /// How many units were already transferred. + pub current: u64, + /// How many units there are in total. + pub total: u64, +} + +impl From for AbstractProgress { + fn from(value: matrix_sdk::AbstractProgress) -> Self { + Self { + current: value.current.try_into().unwrap_or(u64::MAX), + total: value.total.try_into().unwrap_or(u64::MAX), + } + } +} + #[derive(uniffi::Object)] pub struct Client { pub(crate) inner: AsyncRuntimeDropped, @@ -539,6 +563,12 @@ impl Client { self.inner.send_queue().set_enabled(enable).await; } + /// Enables or disables progress reporting for media uploads in the send + /// queue. + pub fn enable_send_queue_upload_progress(&self, enable: bool) { + self.inner.send_queue().enable_upload_progress(enable); + } + /// Subscribe to the global enablement status of the send queue, at the /// client-wide level. /// diff --git a/bindings/matrix-sdk-ffi/src/timeline/mod.rs b/bindings/matrix-sdk-ffi/src/timeline/mod.rs index 641560f8999..f1c1eb63fcb 100644 --- a/bindings/matrix-sdk-ffi/src/timeline/mod.rs +++ b/bindings/matrix-sdk-ffi/src/timeline/mod.rs @@ -35,8 +35,8 @@ use matrix_sdk_common::{ stream::StreamExt, }; use matrix_sdk_ui::timeline::{ - self, AttachmentSource, EventItemOrigin, Profile, TimelineDetails, - TimelineUniqueId as SdkTimelineUniqueId, + self, AttachmentSource, EventItemOrigin, EventSendProgress as SdkEventSendProgress, Profile, + TimelineDetails, TimelineUniqueId as SdkTimelineUniqueId, }; use mime::Mime; use reply::{EmbeddedEventDetails, InReplyToDetails}; @@ -66,7 +66,7 @@ use uuid::Uuid; use self::content::TimelineItemContent; pub use self::msg_like::MessageContent; use crate::{ - client::ProgressWatcher, + client::{AbstractProgress, ProgressWatcher}, error::{ClientError, RoomError}, event::EventOrTransactionId, helpers::unwrap_or_clone_arc, @@ -270,6 +270,32 @@ impl TryInto for ReplyParameters { } } +/// This type represents the "send progress" of a local event timeline item. +#[derive(Clone, Copy, uniffi::Enum)] +pub enum EventSendProgress { + /// A media is being uploaded. + MediaUpload { + /// The index of the media within the transaction. A file and its + /// thumbnail share the same index. Will always be 0 for non-gallery + /// media uploads. + index: u64, + + /// The current combined upload progress for both the file and, + /// if it exists, its thumbnail. + progress: AbstractProgress, + }, +} + +impl From for EventSendProgress { + fn from(value: SdkEventSendProgress) -> Self { + match value { + SdkEventSendProgress::MediaUpload { index, progress } => { + Self::MediaUpload { index, progress: progress.into() } + } + } + } +} + #[matrix_sdk_ffi_macros::export] impl Timeline { pub async fn add_listener(&self, listener: Box) -> Arc { @@ -1018,7 +1044,10 @@ impl TimelineItem { #[derive(Clone, uniffi::Enum)] pub enum EventSendState { /// The local event has not been sent yet. - NotSentYet, + NotSentYet { + /// The progress of the sending operation, if any is available. + progress: Option, + }, /// The local event has been sent to the server, but unsuccessfully: The /// sending has failed. @@ -1043,7 +1072,9 @@ impl From<&matrix_sdk_ui::timeline::EventSendState> for EventSendState { use matrix_sdk_ui::timeline::EventSendState::*; match value { - NotSentYet => Self::NotSentYet, + NotSentYet { progress } => { + Self::NotSentYet { progress: progress.clone().map(|p| p.into()) } + } SendingFailed { error, is_recoverable } => { let as_queue_wedge_error: matrix_sdk::QueueWedgeError = (&**error).into(); Self::SendingFailed { diff --git a/crates/matrix-sdk-base/src/store/send_queue.rs b/crates/matrix-sdk-base/src/store/send_queue.rs index 877ad0b4b39..17b530d1504 100644 --- a/crates/matrix-sdk-base/src/store/send_queue.rs +++ b/crates/matrix-sdk-base/src/store/send_queue.rs @@ -238,7 +238,6 @@ pub enum DependentQueuedRequestKind { related_to: OwnedTransactionId, /// Whether the depended upon request was a thumbnail or a file upload. - #[cfg(feature = "unstable-msc4274")] #[serde(default = "default_parent_is_thumbnail_upload")] parent_is_thumbnail_upload: bool, }, @@ -275,7 +274,6 @@ pub enum DependentQueuedRequestKind { /// If parent_is_thumbnail_upload is missing, we assume the request is for a /// file upload following a thumbnail upload. This was the only possible case /// before parent_is_thumbnail_upload was introduced. -#[cfg(feature = "unstable-msc4274")] fn default_parent_is_thumbnail_upload() -> bool { true } diff --git a/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs b/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs index 10030684000..0e79422909a 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs @@ -627,7 +627,7 @@ mod tests { fn local_event() -> Arc { let event_kind = EventTimelineItemKind::Local(LocalEventTimelineItem { - send_state: EventSendState::NotSentYet, + send_state: EventSendState::NotSentYet { progress: None }, transaction_id: OwnedTransactionId::from("trans"), send_handle: None, }); diff --git a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs index e36228b92d3..227d3df69d3 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs @@ -66,9 +66,9 @@ use super::{ item::TimelineUniqueId, subscriber::TimelineSubscriber, traits::{Decryptor, RoomDataProvider}, - DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails, - PaginationError, Profile, TimelineDetails, TimelineEventItemId, TimelineFocus, TimelineItem, - TimelineItemContent, TimelineItemKind, VirtualTimelineItem, + DateDividerMode, EmbeddedEvent, Error, EventSendProgress, EventSendState, EventTimelineItem, + InReplyToDetails, PaginationError, Profile, TimelineDetails, TimelineEventItemId, + TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind, VirtualTimelineItem, }; use crate::{ timeline::{ @@ -1068,7 +1068,7 @@ impl TimelineController { warn!("We looked for a local item, but it transitioned as remote??"); return false; }; - prev_local_item.with_send_state(EventSendState::NotSentYet) + prev_local_item.with_send_state(EventSendState::NotSentYet { progress: None }) }; // Replace the local-related state (kind) and the content state. @@ -1352,7 +1352,11 @@ impl TimelineController { } RoomSendQueueUpdate::RetryEvent { transaction_id } => { - self.update_event_send_state(&transaction_id, EventSendState::NotSentYet).await; + self.update_event_send_state( + &transaction_id, + EventSendState::NotSentYet { progress: None }, + ) + .await; } RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => { @@ -1360,9 +1364,14 @@ impl TimelineController { .await; } - RoomSendQueueUpdate::UploadedMedia { related_to, .. } => { - // TODO(bnjbvr): Do something else? - info!(txn_id = %related_to, "some media for a media event has been uploaded"); + RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => { + self.update_event_send_state( + &related_to, + EventSendState::NotSentYet { + progress: Some(EventSendProgress::MediaUpload { index, progress }), + }, + ) + .await; } } } diff --git a/crates/matrix-sdk-ui/src/timeline/controller/observable_items.rs b/crates/matrix-sdk-ui/src/timeline/controller/observable_items.rs index 0ca9a6fe32e..ec8d3c31699 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/observable_items.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/observable_items.rs @@ -784,7 +784,7 @@ mod observable_items_tests { thread_summary: None, }), EventTimelineItemKind::Local(LocalEventTimelineItem { - send_state: EventSendState::NotSentYet, + send_state: EventSendState::NotSentYet { progress: None }, transaction_id: transaction_id.into(), send_handle: None, }), diff --git a/crates/matrix-sdk-ui/src/timeline/event_handler.rs b/crates/matrix-sdk-ui/src/timeline/event_handler.rs index b0b202b6809..e4d8ba47b63 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_handler.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_handler.rs @@ -741,7 +741,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { let kind: EventTimelineItemKind = match &self.ctx.flow { Flow::Local { txn_id, send_handle } => LocalEventTimelineItem { - send_state: EventSendState::NotSentYet, + send_state: EventSendState::NotSentYet { progress: None }, transaction_id: txn_id.to_owned(), send_handle: send_handle.clone(), } diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/local.rs b/crates/matrix-sdk-ui/src/timeline/event_item/local.rs index 2890b6eab8e..5d3e3fc398d 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/local.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/local.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use as_variant::as_variant; -use matrix_sdk::{send_queue::SendHandle, Error}; +use matrix_sdk::{send_queue::SendHandle, AbstractProgress, Error}; use ruma::{EventId, OwnedEventId, OwnedTransactionId}; use super::TimelineEventItemId; @@ -65,7 +65,10 @@ impl LocalEventTimelineItem { #[derive(Clone, Debug)] pub enum EventSendState { /// The local event has not been sent yet. - NotSentYet, + NotSentYet { + /// The progress of the sending operation, if any is available. + progress: Option, + }, /// The local event has been sent to the server, but unsuccessfully: The /// sending has failed. SendingFailed { @@ -84,3 +87,21 @@ pub enum EventSendState { event_id: OwnedEventId, }, } + +/// This type represents the "send progress" of a local event timeline item. +#[derive(Clone, Debug)] +pub enum EventSendProgress { + /// A media (consisting of a file and possibly a thumbnail) is being + /// uploaded. + MediaUpload { + /// The index of the media within the transaction. A file and its + /// thumbnail share the same index. Will always be 0 for non-gallery + /// media uploads. + index: u64, + + /// The combined upload progress across the file and, if existing, its + /// thumbnail. For gallery uploads, the progress is reported per indexed + /// gallery item. + progress: AbstractProgress, + }, +} diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs index f484803b3bb..ed6b39da773 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs @@ -56,7 +56,7 @@ pub use self::{ PollResult, PollState, RoomMembershipChange, RoomPinnedEventsChange, Sticker, ThreadSummary, TimelineItemContent, }, - local::EventSendState, + local::{EventSendProgress, EventSendState}, }; /// An item in the timeline that represents at least one event. diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index 190ea4c8df0..21fb8aad8e0 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -91,11 +91,11 @@ pub use self::{ error::*, event_item::{ AnyOtherFullStateEventContent, EmbeddedEvent, EncryptedMessage, EventItemOrigin, - EventSendState, EventTimelineItem, InReplyToDetails, MemberProfileChange, MembershipChange, - Message, MsgLikeContent, MsgLikeKind, OtherState, PollResult, PollState, Profile, - ReactionInfo, ReactionStatus, ReactionsByKeyBySender, RoomMembershipChange, - RoomPinnedEventsChange, Sticker, ThreadSummary, TimelineDetails, TimelineEventItemId, - TimelineItemContent, + EventSendProgress, EventSendState, EventTimelineItem, InReplyToDetails, + MemberProfileChange, MembershipChange, Message, MsgLikeContent, MsgLikeKind, OtherState, + PollResult, PollState, Profile, ReactionInfo, ReactionStatus, ReactionsByKeyBySender, + RoomMembershipChange, RoomPinnedEventsChange, Sticker, ThreadSummary, TimelineDetails, + TimelineEventItemId, TimelineItemContent, }, event_type_filter::TimelineEventTypeFilter, item::{TimelineItem, TimelineItemKind, TimelineUniqueId}, diff --git a/crates/matrix-sdk-ui/src/timeline/tests/echo.rs b/crates/matrix-sdk-ui/src/timeline/tests/echo.rs index dd0fb7864f8..66a4ed05a8d 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/echo.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/echo.rs @@ -50,7 +50,10 @@ async fn test_remote_echo_full_trip() { let item = assert_next_matches!(stream, VectorDiff::PushBack { value } => value); let event_item = item.as_event().unwrap(); assert!(event_item.is_local_echo()); - assert_matches!(event_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!( + event_item.send_state(), + Some(EventSendState::NotSentYet { progress: None }) + ); assert!(!event_item.can_be_replied_to()); item.unique_id().to_owned() }; @@ -308,7 +311,7 @@ async fn test_no_reuse_of_counters() { let local_id = assert_next_matches_with_timeout!(stream, VectorDiff::PushBack { value: item } => { let event_item = item.as_event().unwrap(); assert!(event_item.is_local_echo()); - assert_matches!(event_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(event_item.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert!(!event_item.can_be_replied_to()); item.unique_id().to_owned() }); diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs index 56fb6eb7deb..35eea87c970 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs @@ -68,7 +68,7 @@ async fn test_echo() { assert_let!(VectorDiff::PushBack { value: local_echo } = &timeline_updates[0]); let item = local_echo.as_event().unwrap(); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert_let!(Some(msg) = item.content().as_message()); assert_let!(MessageType::Text(text) = msg.msgtype()); assert_eq!(text.body, "Hello, World!"); @@ -150,7 +150,7 @@ async fn test_retry_failed() { // First, local echo is added. assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { - assert_matches!(value.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(value.send_state(), Some(EventSendState::NotSentYet { progress: None })); }); // Sending fails, because the error is a transient one that's recoverable, @@ -216,7 +216,7 @@ async fn test_dedup_by_event_id_late() { // Timeline: [local echo] assert_let!(VectorDiff::PushBack { value: local_echo } = &timeline_updates[0]); let item = local_echo.as_event().unwrap(); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); // Timeline: [date-divider, local echo] assert_let!(VectorDiff::PushFront { value: date_divider } = &timeline_updates[1]); @@ -283,7 +283,7 @@ async fn test_cancel_failed() { // Local echo is added (immediately) assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { - assert_matches!(value.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(value.send_state(), Some(EventSendState::NotSentYet { progress: None })); }); // Sending fails, the mock server has no matching route diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/edit.rs b/crates/matrix-sdk-ui/tests/integration/timeline/edit.rs index b5767d4985e..5bbdc5f5406 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/edit.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/edit.rs @@ -199,7 +199,7 @@ async fn test_edit_local_echo() { let internal_id = item.unique_id(); let item = item.as_event().unwrap(); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert_let!(VectorDiff::PushFront { value: date_divider } = &timeline_updates[1]); assert!(date_divider.is_date_divider()); @@ -249,7 +249,7 @@ async fn test_edit_local_echo() { assert!(item.is_local_echo()); // The send state has been reset. - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let edit_message = item.content().as_message().unwrap(); assert_eq!(edit_message.body(), "hello, world"); @@ -635,7 +635,7 @@ async fn test_edit_local_echo_with_unsupported_content() { assert_let!(VectorDiff::PushBack { value: item } = &timeline_updates[0]); let item = item.as_event().unwrap(); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert_let!(VectorDiff::PushFront { value: date_divider } = &timeline_updates[1]); assert!(date_divider.is_date_divider()); @@ -689,7 +689,7 @@ async fn test_edit_local_echo_with_unsupported_content() { assert_let!(VectorDiff::PushBack { value: item } = &timeline_updates[0]); let item = item.as_event().unwrap(); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); // Let's edit the local echo (poll start) with an unsupported type (message). let edit_err = timeline diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/media.rs b/crates/matrix-sdk-ui/tests/integration/timeline/media.rs index 25d647e63ce..1a9cba61e12 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/media.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/media.rs @@ -25,9 +25,10 @@ use matrix_sdk::{ attachment::AttachmentConfig, room::reply::{EnforceThread, Reply}, test_utils::mocks::MatrixMockServer, + AbstractProgress, }; use matrix_sdk_test::{async_test, event_factory::EventFactory, JoinedRoomBuilder, ALICE}; -use matrix_sdk_ui::timeline::{AttachmentSource, EventSendState, RoomExt}; +use matrix_sdk_ui::timeline::{AttachmentSource, EventSendProgress, EventSendState, RoomExt}; #[cfg(feature = "unstable-msc4274")] use matrix_sdk_ui::timeline::{GalleryConfig, GalleryItemInfo}; #[cfg(feature = "unstable-msc4274")] @@ -124,7 +125,7 @@ async fn test_send_attachment_from_file() { { assert_let_timeout!(Some(VectorDiff::PushBack { value: item }) = timeline_stream.next()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert_let!(Some(msg) = item.content().as_message()); // Body is the caption, because there's both a caption and filename. @@ -141,6 +142,28 @@ async fn test_send_attachment_from_file() { assert!(aggregated.is_threaded()); } + // The media upload finishes. + { + assert_let_timeout!( + Duration::from_secs(3), + Some(VectorDiff::Set { index: 1, value: item }) = timeline_stream.next() + ); + assert_let!(Some(msg) = item.content().as_message()); + assert_let!( + Some(EventSendState::NotSentYet { + progress: Some(EventSendProgress::MediaUpload { index, progress }) + }) = item.send_state() + ); + assert_eq!(*index, 0); + assert_eq!(progress.current, progress.total); + assert_eq!(get_filename_and_caption(msg.msgtype()), ("test.bin", Some("caption"))); + + // The URI still refers to the local cache. + assert_let!(MessageType::File(file) = msg.msgtype()); + assert_let!(MediaSource::Plain(uri) = &file.source); + assert!(uri.to_string().contains("localhost")); + } + // Eventually, the media is updated with the final MXC IDs… { assert_let_timeout!( @@ -148,7 +171,7 @@ async fn test_send_attachment_from_file() { Some(VectorDiff::Set { index: 1, value: item }) = timeline_stream.next() ); assert_let!(Some(msg) = item.content().as_message()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert_eq!(get_filename_and_caption(msg.msgtype()), ("test.bin", Some("caption"))); // The URI now refers to the final MXC URI. @@ -175,6 +198,7 @@ async fn test_send_attachment_from_file() { async fn test_send_attachment_from_bytes() { let mock = MatrixMockServer::new().await; let client = mock.client_builder().build().await; + client.send_queue().enable_upload_progress(true); mock.mock_authenticated_media_config().ok_default().mount().await; mock.mock_room_state_encryption().plain().mount().await; @@ -205,8 +229,9 @@ async fn test_send_attachment_from_bytes() { // The data of the file. let filename = "test.bin"; - let source = - AttachmentSource::Data { bytes: b"hello world".to_vec(), filename: filename.to_owned() }; + let bytes = b"hello world".to_vec(); + let size = bytes.len(); + let source = AttachmentSource::Data { bytes, filename: filename.to_owned() }; // Set up mocks for the file upload. mock.mock_upload() @@ -227,7 +252,7 @@ async fn test_send_attachment_from_bytes() { { assert_let_timeout!(Some(VectorDiff::PushBack { value: item }) = timeline_stream.next()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert_let!(Some(msg) = item.content().as_message()); // Body is the caption, because there's both a caption and filename. @@ -240,20 +265,53 @@ async fn test_send_attachment_from_bytes() { assert!(uri.to_string().contains("localhost")); } - // Eventually, the media is updated with the final MXC IDs… + // The media upload progress is being reported and eventually the upload + // finishes. { - assert_let_timeout!( - Duration::from_secs(3), - Some(VectorDiff::Set { index: 1, value: item }) = timeline_stream.next() - ); - assert_let!(Some(msg) = item.content().as_message()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); - assert_eq!(get_filename_and_caption(msg.msgtype()), (filename, Some("caption"))); - - // The URI now refers to the final MXC URI. - assert_let!(MessageType::File(file) = msg.msgtype()); - assert_let!(MediaSource::Plain(uri) = &file.source); - assert_eq!(uri.to_string(), "mxc://sdk.rs/media"); + let mut prev_progress: Option = None; + + loop { + assert_let_timeout!( + Duration::from_secs(3), + Some(VectorDiff::Set { index: 1, value: item }) = timeline_stream.next() + ); + + // The caption is still correct. + assert_let!(Some(msg) = item.content().as_message()); + assert_eq!(get_filename_and_caption(msg.msgtype()), (filename, Some("caption"))); + + assert_let!(Some(EventSendState::NotSentYet { progress }) = item.send_state()); + + match progress { + Some(EventSendProgress::MediaUpload { index, progress }) => { + // We're only uploading a single file. + assert_eq!(*index, 0); + + // The progress is reported in units of the unencrypted file size. + assert!(progress.current <= progress.total); + assert_eq!(progress.total, size); + + // The progress only increases. + if let Some(prev_progress) = prev_progress { + assert!(progress.current >= prev_progress.current); + } + prev_progress = Some(*progress); + + // The URI still refers to the local cache. + assert_let!(MessageType::File(file) = msg.msgtype()); + assert_let!(MediaSource::Plain(uri) = &file.source); + assert!(uri.to_string().contains("localhost")); + } + None => { + // The upload finished and the URI now refers to the final MXC URI. + assert_let!(MessageType::File(file) = msg.msgtype()); + assert_let!(MediaSource::Plain(uri) = &file.source); + assert_eq!(uri.to_string(), "mxc://sdk.rs/media"); + + break; + } + } + } } // And eventually the event itself is sent. @@ -341,7 +399,7 @@ async fn test_send_gallery_from_bytes() { { assert_let_timeout!(Some(VectorDiff::PushBack { value: item }) = timeline_stream.next()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert_let!(Some(msg) = item.content().as_message()); // Body matches gallery caption. @@ -361,6 +419,40 @@ async fn test_send_gallery_from_bytes() { assert!(uri.to_string().contains("localhost")); } + // The media upload finishes. + { + assert_let_timeout!( + Duration::from_secs(3), + Some(VectorDiff::Set { index: 1, value: item }) = timeline_stream.next() + ); + assert_let!( + Some(EventSendState::NotSentYet { + progress: Some(EventSendProgress::MediaUpload { index, progress }) + }) = item.send_state() + ); + assert_let!(Some(msg) = item.content().as_message()); + + // The upload has finished. + assert_eq!(*index, 0); + assert_eq!(progress.current, progress.total); + + // Body matches gallery caption. + assert_eq!(msg.body(), "caption"); + + // Message is gallery of expected length + assert_let!(MessageType::Gallery(content) = msg.msgtype()); + assert_eq!(1, content.itemtypes.len()); + assert_let!(GalleryItemType::File(file) = content.itemtypes.first().unwrap()); + + // Item has filename and caption + assert_eq!(filename, file.filename()); + assert_eq!(Some("item caption"), file.caption()); + + // The URI still refers to the local cache. + assert_let!(MediaSource::Plain(uri) = &file.source); + assert!(uri.to_string().contains("localhost")); + } + // Eventually, the media is updated with the final MXC IDs… { assert_let_timeout!( @@ -368,7 +460,7 @@ async fn test_send_gallery_from_bytes() { Some(VectorDiff::Set { index: 1, value: item }) = timeline_stream.next() ); assert_let!(Some(msg) = item.content().as_message()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); // Message is gallery of expected length assert_let!(MessageType::Gallery(content) = msg.msgtype()); diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs b/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs index 284a5f16327..e49355e5115 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs @@ -243,7 +243,7 @@ async fn test_redact_message() { assert_let!(VectorDiff::PushBack { value: second } = &timeline_updates[0]); let second = second.as_event().unwrap(); - assert_matches!(second.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(second.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert_let!(Some(timeline_updates) = timeline_stream.next().await); assert_eq!(timeline_updates.len(), 1); @@ -297,7 +297,7 @@ async fn test_redact_local_sent_message() { assert_let!(VectorDiff::PushBack { value: item } = &timeline_updates[0]); let event = item.as_event().unwrap(); assert!(event.is_local_echo()); - assert_matches!(event.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(event.send_state(), Some(EventSendState::NotSentYet { progress: None })); // As well as a date divider. assert_let!(VectorDiff::PushFront { value: date_divider } = &timeline_updates[1]); diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs index 1539f50fc9a..676b4c336bb 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs @@ -379,7 +379,10 @@ async fn test_clear_with_echoes() { // The message that failed to send. assert_matches!(event_items[1].send_state(), Some(EventSendState::SendingFailed { .. })); // The message that is still pending. - assert_matches!(event_items[2].send_state(), Some(EventSendState::NotSentYet)); + assert_matches!( + event_items[2].send_state(), + Some(EventSendState::NotSentYet { progress: None }) + ); // When we clear the timeline now, timeline.clear().await; @@ -390,7 +393,10 @@ async fn test_clear_with_echoes() { assert_eq!(event_items.len(), 2); assert_matches!(event_items[0].send_state(), Some(EventSendState::SendingFailed { .. })); - assert_matches!(event_items[1].send_state(), Some(EventSendState::NotSentYet)); + assert_matches!( + event_items[1].send_state(), + Some(EventSendState::NotSentYet { progress: None }) + ); } #[async_test] diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs b/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs index 78bbcd36e80..250b93a0ea4 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs @@ -329,7 +329,7 @@ async fn test_local_reaction_to_local_echo() { let item = item.as_event().unwrap(); assert!(item.is_local_echo()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert_eq!(item.content().as_message().unwrap().body(), "lol"); assert!(item.content().reactions().cloned().unwrap_or_default().is_empty()); @@ -355,7 +355,7 @@ async fn test_local_reaction_to_local_echo() { assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]); let item = item.as_event().unwrap(); assert!(item.is_local_echo()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let reactions = item.content().reactions().cloned().unwrap_or_default(); assert_eq!(reactions.len(), 1); @@ -377,7 +377,7 @@ async fn test_local_reaction_to_local_echo() { assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]); let item = item.as_event().unwrap(); assert!(item.is_local_echo()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let reactions = item.content().reactions().cloned().unwrap_or_default(); assert_eq!(reactions.len(), 2); @@ -398,7 +398,7 @@ async fn test_local_reaction_to_local_echo() { assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]); let item = item.as_event().unwrap(); assert!(item.is_local_echo()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let reactions = item.content().reactions().cloned().unwrap_or_default(); assert_eq!(reactions.len(), 1); diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/replies.rs b/crates/matrix-sdk-ui/tests/integration/timeline/replies.rs index 70ce43e91c3..7e5a56ced7a 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/replies.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/replies.rs @@ -719,7 +719,7 @@ async fn test_send_reply() { let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); - assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let msglike_reply_message = reply_item.content().as_msglike().unwrap(); let reply_message = reply_item.content().as_message().unwrap(); assert_eq!(reply_message.body(), "Replying to Bob"); @@ -821,7 +821,7 @@ async fn test_send_reply_to_self() { let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); - assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let msglike_reply_message = reply_item.content().as_msglike().unwrap(); let reply_message = msglike_reply_message.as_message().unwrap(); assert_eq!(reply_message.body(), "Replying to self"); @@ -886,7 +886,7 @@ async fn test_send_reply_to_threaded() { let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); - assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let msglike = reply_item.content().as_msglike().unwrap(); let reply_message = msglike.as_message().unwrap(); @@ -991,7 +991,7 @@ async fn test_send_reply_with_event_id() { let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); - assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let msglike_reply_message = reply_item.content().as_msglike().unwrap(); let reply_message = msglike_reply_message.as_message().unwrap(); assert_eq!(reply_message.body(), "Replying to Bob"); @@ -1079,7 +1079,7 @@ async fn test_send_reply_enforce_thread() { let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); - assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let msglike_reply_message = reply_item.content().as_msglike().unwrap(); let reply_message = msglike_reply_message.as_message().unwrap(); assert_eq!(reply_message.body(), "Replying to Bob"); @@ -1178,7 +1178,7 @@ async fn test_send_reply_enforce_thread_is_reply() { let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); - assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let msglike_reply_message = reply_item.content().as_msglike().unwrap(); let reply_message = msglike_reply_message.as_message().unwrap(); assert_eq!(reply_message.body(), "Replying to Bob"); @@ -1273,7 +1273,7 @@ async fn test_send_reply_with_event_id_that_is_redacted() { let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); - assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet { progress: None })); let msglike_reply_message = reply_item.content().as_msglike().unwrap(); let reply_message = msglike_reply_message.as_message().unwrap(); assert_eq!(reply_message.body(), "Replying to Bob"); diff --git a/crates/matrix-sdk/src/http_client/mod.rs b/crates/matrix-sdk/src/http_client/mod.rs index 77c4bee20c5..6974bf04a3e 100644 --- a/crates/matrix-sdk/src/http_client/mod.rs +++ b/crates/matrix-sdk/src/http_client/mod.rs @@ -240,6 +240,21 @@ pub struct TransmissionProgress { pub total: usize, } +/// Progress of an operation in abstract units. +/// +/// Contrary to [`TransmissionProgress`], this allows tracking the progress +/// of sending or receiving a payload in estimated pseudo units representing a +/// percentage. This is helpful in cases where the exact progress in bytes isn't +/// known, for instance, because encryption (which changes the size) happens on +/// the fly. +#[derive(Clone, Copy, Debug, Default)] +pub struct AbstractProgress { + /// How many units were already transferred. + pub current: usize, + /// How many units there are in total. + pub total: usize, +} + async fn response_to_http_response( mut response: reqwest::Response, ) -> Result, reqwest::Error> { diff --git a/crates/matrix-sdk/src/lib.rs b/crates/matrix-sdk/src/lib.rs index 2680d4cb1e3..fcef3394424 100644 --- a/crates/matrix-sdk/src/lib.rs +++ b/crates/matrix-sdk/src/lib.rs @@ -74,7 +74,7 @@ pub use error::{ Error, HttpError, HttpResult, NotificationSettingsError, RefreshTokenError, Result, RumaApiError, }; -pub use http_client::TransmissionProgress; +pub use http_client::{AbstractProgress, TransmissionProgress}; #[cfg(all(feature = "e2e-encryption", feature = "sqlite"))] pub use matrix_sdk_sqlite::SqliteCryptoStore; #[cfg(feature = "sqlite")] diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index c100fa1f80a..03a5a4e6d9d 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -138,6 +138,7 @@ use std::{ }; use as_variant::as_variant; +use eyeball::SharedObservable; #[cfg(feature = "unstable-msc4274")] use matrix_sdk_base::store::FinishGalleryItemInfo; use matrix_sdk_base::{ @@ -177,7 +178,7 @@ use crate::{ config::RequestConfig, error::RetryKind, room::{edit::EditedContent, WeakRoom}, - Client, Media, Room, + AbstractProgress, Client, Media, Room, TransmissionProgress, }; mod upload; @@ -248,6 +249,7 @@ impl SendQueue { data.is_dropping.clone(), &self.client, owned_room_id.clone(), + data.report_media_upload_progress.clone(), ); map.insert(owned_room_id, room_q.clone()); @@ -285,6 +287,11 @@ impl SendQueue { self.data().globally_enabled.load(Ordering::SeqCst) } + /// Enable or disable progress reporting for media uploads. + pub fn enable_upload_progress(&self, enabled: bool) { + self.data().report_media_upload_progress.store(enabled, Ordering::SeqCst); + } + /// Subscribe to all updates for all rooms. /// /// Use [`RoomSendQueue::subscribe`] to subscribe to update for a _specific @@ -300,6 +307,23 @@ impl SendQueue { } } +/// Metadata about a thumbnail needed when pushing media uploads to the send +/// queue. +#[derive(Clone, Debug)] +struct QueueThumbnailInfo { + /// Metadata about the thumbnail needed when finishing a media upload. + finish_upload_thumbnail_info: FinishUploadThumbnailInfo, + + /// The parameters for the request to retrieve the thumbnail data. + media_request_parameters: MediaRequestParameters, + + /// The thumbnail's mime type. + content_type: Mime, + + /// The thumbnail's file size in bytes. + file_size: usize, +} + /// A specific room's send queue ran into an error, and it has disabled itself. #[derive(Clone, Debug)] pub struct SendQueueRoomError { @@ -345,6 +369,9 @@ pub(super) struct SendQueueData { /// Are we currently dropping the Client? is_dropping: Arc, + + /// Will media upload progress be reported via send queue updates? + report_media_upload_progress: Arc, } impl SendQueueData { @@ -359,6 +386,7 @@ impl SendQueueData { global_update_sender, error_sender, is_dropping: Arc::new(false.into()), + report_media_upload_progress: Arc::new(false.into()), } } } @@ -407,6 +435,7 @@ impl RoomSendQueue { is_dropping: Arc, client: &Client, room_id: OwnedRoomId, + report_media_upload_progress: Arc, ) -> Self { let (update_sender, _) = broadcast::channel(32); @@ -425,6 +454,7 @@ impl RoomSendQueue { locally_enabled.clone(), global_error_sender, is_dropping, + report_media_upload_progress, )); Self { @@ -548,20 +578,10 @@ impl RoomSendQueue { locally_enabled: Arc, global_error_sender: broadcast::Sender, is_dropping: Arc, + report_media_upload_progress: Arc, ) { trace!("spawned the sending task"); - fn send_update( - global_update_sender: &broadcast::Sender, - update_sender: &broadcast::Sender, - room_id: &RoomId, - update: RoomSendQueueUpdate, - ) { - let _ = update_sender.send(update.clone()); - let _ = - global_update_sender.send(SendQueueUpdate { room_id: room_id.to_owned(), update }); - } - let room_id = room.room_id(); loop { @@ -618,7 +638,23 @@ impl RoomSendQueue { continue; }; - match Self::handle_request(&room, queued_request, cancel_upload_rx).await { + // Prepare to watch and communicate the request progress for media uploads. + let media_upload_info = + RoomSendQueue::try_create_media_upload_info(&queued_request, &room, &queue) + .await + .unwrap_or_default(); + let progress = RoomSendQueue::try_create_media_upload_progress_observable( + &report_media_upload_progress, + &media_upload_info, + &related_txn_id, + room_id, + &global_update_sender, + &update_sender, + ); + + match Self::handle_request(&room, queued_request, cancel_upload_rx, progress.clone()) + .await + { Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await { Ok(()) => match parent_key { @@ -636,9 +672,18 @@ impl RoomSendQueue { &global_update_sender, &update_sender, room_id, - RoomSendQueueUpdate::UploadedMedia { + RoomSendQueueUpdate::MediaUpload { related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(), - file: media_info.file, + file: Some(media_info.file), + index: media_upload_info.index, + progress: estimate_combined_media_upload_progress( + // The file finished uploading + AbstractProgress { + current: media_upload_info.bytes, + total: media_upload_info.bytes, + }, + &media_upload_info, + ), }, ); } @@ -724,6 +769,172 @@ impl RoomSendQueue { info!("exited sending task"); } + /// Try to create metadata required to compute the progress of a media + /// upload. + async fn try_create_media_upload_info( + queued_request: &QueuedRequest, + room: &Room, + queue: &QueueStorage, + ) -> Option { + let QueuedRequestKind::MediaUpload { + cache_key, + thumbnail_source, + related_to, + #[cfg(feature = "unstable-msc4274")] + accumulated, + .. + } = &queued_request.kind + else { + return None; + }; + + // Determine the item's index, if this is a gallery upload. + let index = { + cfg_if::cfg_if! { + if #[cfg(feature = "unstable-msc4274")] { + accumulated.len() + } else { + 0 // Before MSC4274 only a single file (and thumbnail) could be sent per event. + } + } + }; + + // Get the size of the file being uploaded from the event cache. + let bytes = if let Ok(cache) = room.client().event_cache_store().lock().await { + if let Ok(Some(content)) = cache.get_media_content(cache_key).await { + content.len() + } else { + 0 + } + } else { + 0 + }; + + // If this is a file upload, get the size of any previously uploaded thumbnail + // from the in-memory media sizes cache. + let uploaded_thumbnail_bytes = if thumbnail_source.is_some() { + if let Some(sizes) = queue.store.lock().await.thumbnail_size_cache.get(related_to) { + sizes.get(index).copied().flatten().unwrap_or(0) + } else { + 0 + } + } else { + 0 + }; + + // If this is a thumbnail upload, get the size of the pending file upload from + // the dependent requests. + let pending_file_bytes = RoomSendQueue::get_dependent_pending_file_upload_size( + queued_request.transaction_id.clone(), + queue, + room, + ) + .await; + + Some(MediaUploadInfo { + index: index as u64, + bytes, + uploaded_thumbnail_bytes, + pending_file_bytes, + }) + } + + /// Determine the size of a pending file upload, if this is a thumbnail + /// upload or return 0 otherwise. + async fn get_dependent_pending_file_upload_size( + txn_id: OwnedTransactionId, + queue: &QueueStorage, + room: &Room, + ) -> usize { + let guard = queue.store.lock().await; + + let Ok(client) = guard.client() else { + return 0; + }; + + let Ok(dependent_requests) = + client.state_store().load_dependent_queued_requests(room.room_id()).await + else { + return 0; + }; + + let Some((cache_key, parent_is_thumbnail_upload)) = dependent_requests.iter().find_map(|r| { + if r.parent_transaction_id != txn_id { + return None; + } + as_variant!(&r.kind, DependentQueuedRequestKind::UploadFileOrThumbnail { cache_key, parent_is_thumbnail_upload, .. } => (cache_key.clone(), *parent_is_thumbnail_upload)) + }) else { + return 0; + }; + + // If this is not a thumbnail upload, we're uploading a gallery and the + // dependent request is for the next gallery item. + if !parent_is_thumbnail_upload { + return 0; + } + + if let Ok(cache) = room.client().event_cache_store().lock().await { + if let Ok(Some(content)) = cache.get_media_content(&cache_key).await { + content.len() + } else { + 0 + } + } else { + 0 + } + } + + /// Try to create an observable to watch a media's upload progress. + fn try_create_media_upload_progress_observable( + report_media_upload_progress: &Arc, + media_upload_info: &MediaUploadInfo, + related_txn_id: &Option, + room_id: &RoomId, + global_update_sender: &broadcast::Sender, + update_sender: &broadcast::Sender, + ) -> Option> { + if !report_media_upload_progress.load(Ordering::SeqCst) { + return None; + } + + if let Some(related_txn_id) = related_txn_id { + let progress: SharedObservable = Default::default(); + let mut subscriber = progress.subscribe(); + + let media_upload_info = media_upload_info.clone(); + let related_to = related_txn_id.clone(); + let room_id = room_id.to_owned(); + let global_update_sender = global_update_sender.clone(); + let update_sender = update_sender.clone(); + + // Watch and communicate the progress on a detached background task. Once + // the progress observable is dropped, next() will return None and the + // task will end. + spawn(async move { + while let Some(progress) = subscriber.next().await { + send_update( + &global_update_sender, + &update_sender, + &room_id, + RoomSendQueueUpdate::MediaUpload { + related_to: related_to.clone(), + file: None, + index: media_upload_info.index, + progress: estimate_combined_media_upload_progress( + estimate_media_upload_progress(progress, media_upload_info.bytes), + &media_upload_info, + ), + }, + ); + } + }); + + Some(progress) + } else { + None + } + } + /// Handles a single request and returns the [`SentRequestKey`] on success /// (unless the request was cancelled, in which case it'll return /// `None`). @@ -731,6 +942,7 @@ impl RoomSendQueue { room: &Room, request: QueuedRequest, cancel_upload_rx: Option>, + progress: Option>, ) -> Result, crate::Error> { match request.kind { QueuedRequestKind::Event { content } => { @@ -778,18 +990,29 @@ impl RoomSendQueue { let media_source = if room.latest_encryption_state().await?.is_encrypted() { trace!("upload will be encrypted (encrypted room)"); let mut cursor = std::io::Cursor::new(data); - let encrypted_file = room - .client() + let client = room.client(); + let mut req = client .upload_encrypted_file(&mut cursor) - .with_request_config(RequestConfig::short_retry()) - .await?; + .with_request_config(RequestConfig::short_retry()); + + if let Some(progress) = progress { + req = req.with_send_progress_observable(progress); + } + + let encrypted_file = req.await?; MediaSource::Encrypted(Box::new(encrypted_file)) } else { trace!("upload will be in clear text (room without encryption)"); let request_config = RequestConfig::short_retry() .timeout(Media::reasonable_upload_timeout(&data)); - let res = - room.client().media().upload(&mime, data, Some(request_config)).await?; + let mut req = + room.client().media().upload(&mime, data, Some(request_config)); + + if let Some(progress) = progress { + req = req.with_send_progress_observable(progress); + } + + let res = req.await?; MediaSource::Plain(res.content_uri) }; @@ -797,8 +1020,14 @@ impl RoomSendQueue { let media_source = { let request_config = RequestConfig::short_retry() .timeout(Media::reasonable_upload_timeout(&data)); - let res = - room.client().media().upload(&mime, data, Some(request_config)).await?; + let mut req = + room.client().media().upload(&mime, data, Some(request_config)); + + if let Some(progress) = progress { + req = req.with_send_progress_observable(progress); + } + + let res = req.await?; MediaSource::Plain(res.content_uri) }; @@ -867,6 +1096,69 @@ impl RoomSendQueue { } } +/// Emit a send queue update. +fn send_update( + global_update_sender: &broadcast::Sender, + update_sender: &broadcast::Sender, + room_id: &RoomId, + update: RoomSendQueueUpdate, +) { + let _ = update_sender.send(update.clone()); + let _ = global_update_sender.send(SendQueueUpdate { room_id: room_id.to_owned(), update }); +} + +/// Estimates the upload progress for a single media file (either a thumbnail or +/// a file). +/// +/// This proportionally maps the upload progress given in actual bytes sent +/// (possibly after encryption) into units of the unencrypted file sizes. +/// +/// # Arguments +/// +/// * `progress` - The [`TransmissionProgress`] of uploading the file (possibly +/// after encryption). +/// +/// * `bytes` - The total number of bytes in the file before encryption. +fn estimate_media_upload_progress( + progress: TransmissionProgress, + bytes: usize, +) -> AbstractProgress { + if progress.total == 0 { + return AbstractProgress { current: 0, total: 0 }; + } + + // Did the file finish uploading? + if progress.current == progress.total { + return AbstractProgress { current: bytes, total: bytes }; + } + + // The file is still uploading. Use the rule of 3 to proportionally map the + // progress into units of the original file size. + AbstractProgress { + current: (progress.current as f64 / progress.total as f64 * bytes as f64).round() as usize, + total: bytes, + } +} + +/// Estimate the combined upload progress across a media file and its +/// thumbnail. +/// +/// # Arguments +/// +/// * `progress` - The progress of uploading the current file mapped into units +/// of the original file size before encryption. +/// +/// * `info` - Information about the file(s) being uploaded. +fn estimate_combined_media_upload_progress( + progress: AbstractProgress, + info: &MediaUploadInfo, +) -> AbstractProgress { + AbstractProgress { + current: info.uploaded_thumbnail_bytes + progress.current, + total: info.uploaded_thumbnail_bytes + progress.total + info.pending_file_bytes, + } +} + impl From<&crate::Error> for QueueWedgeError { fn from(value: &crate::Error) -> Self { match value { @@ -944,6 +1236,23 @@ struct BeingSentInfo { cancel_upload: Option>, } +/// Information needed to compute the progress of uploading a media and its +/// associated thumbnail. +#[derive(Clone, Default)] +struct MediaUploadInfo { + /// The index of the uploaded item if this is a gallery upload. Otherwise, + /// zero. + index: u64, + /// The total number of bytes in the file currently being uploaded. + bytes: usize, + /// If the current file is not a thumbnail, the total number of bytes in a + /// previously uploaded thumbnail, if any exists. Otherwise, zero. + uploaded_thumbnail_bytes: usize, + /// If the current file is a thumbnail, the total number of bytes in the + /// related media file still to be uploaded. Otherwise, zero. + pending_file_bytes: usize, +} + impl BeingSentInfo { /// Aborts the upload, if a trigger is available. /// @@ -971,6 +1280,20 @@ struct StoreLock { /// /// Also used as the lock to access the state store. being_sent: Arc>>, + + /// In-memory mapping of media transaction IDs to thumbnail sizes for the + /// purpose of progress reporting. + /// + /// The keys are the transaction IDs for sending the media or gallery event + /// after all uploads have finished. This allows us to easily clean up the + /// cache after the event was sent. + /// + /// For media uploads, the value vector will always have a single element. + /// + /// For galleries, some gallery items might not have a thumbnail while + /// others do. Since we access the thumbnails by their index within the + /// gallery, the vector needs to hold optional usize's. + thumbnail_size_cache: Arc>>>>, } impl StoreLock { @@ -979,6 +1302,7 @@ impl StoreLock { StoreLockGuard { client: self.client.clone(), being_sent: self.being_sent.clone().lock_owned().await, + thumbnail_size_cache: self.thumbnail_size_cache.clone().lock_owned().await, } } } @@ -992,6 +1316,10 @@ struct StoreLockGuard { /// The one queued request that is being sent at the moment, along with /// associated data that can be useful to act upon it. being_sent: OwnedMutexGuard>, + + /// In-memory mapping of media transaction IDs to thumbnail sizes for the + /// purpose of progress reporting. + thumbnail_size_cache: OwnedMutexGuard>>>, } impl StoreLockGuard { @@ -1020,7 +1348,14 @@ impl QueueStorage { /// Create a new queue for queuing requests to be sent later. fn new(client: WeakClient, room: OwnedRoomId) -> Self { - Self { room_id: room, store: StoreLock { client, being_sent: Default::default() } } + Self { + room_id: room, + store: StoreLock { + client, + being_sent: Default::default(), + thumbnail_size_cache: Default::default(), + }, + } } /// Push a new event to be sent in the queue, with a default priority of 0. @@ -1177,6 +1512,8 @@ impl QueueStorage { warn!(txn_id = %transaction_id, "request marked as sent was missing from storage"); } + guard.thumbnail_size_cache.remove(transaction_id); + Ok(()) } @@ -1190,7 +1527,7 @@ impl QueueStorage { &self, transaction_id: &TransactionId, ) -> Result { - let guard = self.store.lock().await; + let mut guard = self.store.lock().await; if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref()) == Some(transaction_id) @@ -1217,6 +1554,8 @@ impl QueueStorage { .remove_send_queue_request(&self.room_id, transaction_id) .await?; + guard.thumbnail_size_cache.remove(transaction_id); + Ok(removed) } @@ -1273,12 +1612,14 @@ impl QueueStorage { created_at: MilliSecondsSinceUnixEpoch, upload_file_txn: OwnedTransactionId, file_media_request: MediaRequestParameters, - thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>, + thumbnail: Option, ) -> Result<(), RoomSendQueueStorageError> { - let guard = self.store.lock().await; + let mut guard = self.store.lock().await; let client = guard.client()?; let store = client.state_store(); + let media_sizes = vec![thumbnail.as_ref().map(|t| t.file_size)]; + let thumbnail_info = self .push_thumbnail_and_media_uploads( store, @@ -1296,7 +1637,7 @@ impl QueueStorage { .save_dependent_queued_request( &self.room_id, &upload_file_txn, - send_event_txn.into(), + send_event_txn.clone().into(), created_at, DependentQueuedRequestKind::FinishUpload { local_echo: Box::new(event), @@ -1306,6 +1647,8 @@ impl QueueStorage { ) .await?; + guard.thumbnail_size_cache.insert(send_event_txn, media_sizes); + Ok(()) } @@ -1321,11 +1664,12 @@ impl QueueStorage { created_at: MilliSecondsSinceUnixEpoch, item_queue_infos: Vec, ) -> Result<(), RoomSendQueueStorageError> { - let guard = self.store.lock().await; + let mut guard = self.store.lock().await; let client = guard.client()?; let store = client.state_store(); let mut finish_item_infos = Vec::with_capacity(item_queue_infos.len()); + let mut media_sizes = Vec::with_capacity(item_queue_infos.len()); let Some((first, rest)) = item_queue_infos.split_first() else { return Ok(()); @@ -1348,6 +1692,7 @@ impl QueueStorage { finish_item_infos .push(FinishGalleryItemInfo { file_upload: upload_file_txn.clone(), thumbnail_info }); + media_sizes.push(thumbnail.as_ref().map(|t| t.file_size)); let mut last_upload_file_txn = upload_file_txn.clone(); @@ -1359,35 +1704,38 @@ impl QueueStorage { thumbnail, } = item_queue_info; - let thumbnail_info = - if let Some((thumbnail_info, thumbnail_media_request, thumbnail_content_type)) = - thumbnail - { - let upload_thumbnail_txn = thumbnail_info.txn.clone(); + let thumbnail_info = if let Some(QueueThumbnailInfo { + finish_upload_thumbnail_info: thumbnail_info, + media_request_parameters: thumbnail_media_request, + content_type: thumbnail_content_type, + .. + }) = thumbnail + { + let upload_thumbnail_txn = thumbnail_info.txn.clone(); - // Save the thumbnail upload request as a dependent request of the last file - // upload. - store - .save_dependent_queued_request( - &self.room_id, - &last_upload_file_txn, - upload_thumbnail_txn.clone().into(), - created_at, - DependentQueuedRequestKind::UploadFileOrThumbnail { - content_type: thumbnail_content_type.to_string(), - cache_key: thumbnail_media_request.clone(), - related_to: send_event_txn.clone(), - parent_is_thumbnail_upload: false, - }, - ) - .await?; + // Save the thumbnail upload request as a dependent request of the last file + // upload. + store + .save_dependent_queued_request( + &self.room_id, + &last_upload_file_txn, + upload_thumbnail_txn.clone().into(), + created_at, + DependentQueuedRequestKind::UploadFileOrThumbnail { + content_type: thumbnail_content_type.to_string(), + cache_key: thumbnail_media_request.clone(), + related_to: send_event_txn.clone(), + parent_is_thumbnail_upload: false, + }, + ) + .await?; - last_upload_file_txn = upload_thumbnail_txn; + last_upload_file_txn = upload_thumbnail_txn; - Some(thumbnail_info) - } else { - None - }; + Some(thumbnail_info) + } else { + None + }; // Save the file upload as a dependent request of the previous upload. store @@ -1409,6 +1757,7 @@ impl QueueStorage { file_upload: upload_file_txn.clone(), thumbnail_info: thumbnail_info.cloned(), }); + media_sizes.push(thumbnail.as_ref().map(|t| t.file_size)); last_upload_file_txn = upload_file_txn.clone(); } @@ -1419,7 +1768,7 @@ impl QueueStorage { .save_dependent_queued_request( &self.room_id, &last_upload_file_txn, - send_event_txn.into(), + send_event_txn.clone().into(), created_at, DependentQueuedRequestKind::FinishGallery { local_echo: Box::new(event), @@ -1428,6 +1777,8 @@ impl QueueStorage { ) .await?; + guard.thumbnail_size_cache.insert(send_event_txn, media_sizes); + Ok(()) } @@ -1445,9 +1796,15 @@ impl QueueStorage { created_at: MilliSecondsSinceUnixEpoch, upload_file_txn: OwnedTransactionId, file_media_request: MediaRequestParameters, - thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>, + thumbnail: Option, ) -> Result, RoomSendQueueStorageError> { - if let Some((thumbnail_info, thumbnail_media_request, thumbnail_content_type)) = thumbnail { + if let Some(QueueThumbnailInfo { + finish_upload_thumbnail_info: thumbnail_info, + media_request_parameters: thumbnail_media_request, + content_type: thumbnail_content_type, + .. + }) = thumbnail + { let upload_thumbnail_txn = thumbnail_info.txn.clone(); // Save the thumbnail upload request. @@ -1479,7 +1836,6 @@ impl QueueStorage { content_type: content_type.to_string(), cache_key: file_media_request, related_to: send_event_txn, - #[cfg(feature = "unstable-msc4274")] parent_is_thumbnail_upload: true, }, ) @@ -1871,25 +2227,12 @@ impl QueueStorage { content_type, cache_key, related_to, - #[cfg(feature = "unstable-msc4274")] parent_is_thumbnail_upload, } => { let Some(parent_key) = parent_key else { // Not finished yet, we should retry later => false. return Ok(false); }; - let parent_is_thumbnail_upload = { - cfg_if::cfg_if! { - if #[cfg(feature = "unstable-msc4274")] { - parent_is_thumbnail_upload - } else { - // Before parent_is_thumbnail_upload was introduced, the only - // possible usage for this request was a file upload following - // a thumbnail upload. - true - } - } - }; self.handle_dependent_file_or_thumbnail_upload( client, dependent_request.own_transaction_id.into(), @@ -2039,7 +2382,7 @@ struct GalleryItemQueueInfo { content_type: Mime, upload_file_txn: OwnedTransactionId, file_media_request: MediaRequestParameters, - thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>, + thumbnail: Option, } /// The content of a local echo. @@ -2136,13 +2479,24 @@ pub enum RoomSendQueueUpdate { event_id: OwnedEventId, }, - /// A media has been successfully uploaded. - UploadedMedia { + /// A media upload (consisting of a file and possibly a thumbnail) has made + /// progress. + MediaUpload { /// The media event this uploaded media relates to. related_to: OwnedTransactionId, - /// The final media source for the file that was just uploaded. - file: MediaSource, + /// The final media source for the file if it has finished uploading. + file: Option, + + /// The index of the media within the transaction. A file and its + /// thumbnail share the same index. Will always be 0 for non-gallery + /// media uploads. + index: u64, + + /// The combined upload progress across the file and, if existing, its + /// thumbnail. For gallery uploads, the progress is reported per indexed + /// gallery item. + progress: AbstractProgress, }, } diff --git a/crates/matrix-sdk/src/send_queue/upload.rs b/crates/matrix-sdk/src/send_queue/upload.rs index 0ba73221db2..c12b5d496bb 100644 --- a/crates/matrix-sdk/src/send_queue/upload.rs +++ b/crates/matrix-sdk/src/send_queue/upload.rs @@ -46,7 +46,7 @@ use ruma::{ }; use tracing::{debug, error, instrument, trace, warn, Span}; -use super::{QueueStorage, RoomSendQueue, RoomSendQueueError}; +use super::{QueueStorage, QueueThumbnailInfo, RoomSendQueue, RoomSendQueueError}; use crate::{ attachment::{AttachmentConfig, Thumbnail}, room::edit::update_media_caption, @@ -170,7 +170,7 @@ fn update_gallery_event_after_upload( struct MediaCacheResult { upload_thumbnail_txn: Option, event_thumbnail_info: Option<(MediaSource, Box)>, - queue_thumbnail_info: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>, + queue_thumbnail_info: Option, } impl RoomSendQueue { @@ -445,6 +445,7 @@ impl RoomSendQueue { // Create the information required for filling the thumbnail section of the // event. let (data, content_type, thumbnail_info) = thumbnail.into_parts(); + let file_size = data.len(); // Cache thumbnail in the cache store. let thumbnail_media_request = Media::make_local_file_media_request(&txn); @@ -464,11 +465,16 @@ impl RoomSendQueue { thumbnail_media_request.source.clone(), thumbnail_info, )), - queue_thumbnail_info: Some(( - FinishUploadThumbnailInfo { txn, width: None, height: None }, - thumbnail_media_request, + queue_thumbnail_info: Some(QueueThumbnailInfo { + finish_upload_thumbnail_info: FinishUploadThumbnailInfo { + txn, + width: None, + height: None, + }, + media_request_parameters: thumbnail_media_request, content_type, - )), + file_size, + }), }) } else { Ok(Default::default()) diff --git a/crates/matrix-sdk/tests/integration/send_queue.rs b/crates/matrix-sdk/tests/integration/send_queue.rs index 61b0113399b..b6bf3c0e3ff 100644 --- a/crates/matrix-sdk/tests/integration/send_queue.rs +++ b/crates/matrix-sdk/tests/integration/send_queue.rs @@ -14,7 +14,7 @@ use matrix_sdk::{ RoomSendQueueUpdate, SendHandle, SendQueueUpdate, }, test_utils::mocks::{MatrixMock, MatrixMockServer}, - Client, MemoryStore, + AbstractProgress, Client, MemoryStore, }; use matrix_sdk_test::{ async_test, event_factory::EventFactory, InvitedRoomBuilder, KnockedRoomBuilder, @@ -162,21 +162,65 @@ macro_rules! assert_update { }}; // Check the next stream event is a notification about an uploaded media. - // Returns a tuple of (transaction_id, send_handle). (($global_watch:ident, $watch:ident) => uploaded { related_to = $related_to:expr, mxc = $mxc:expr }) => {{ assert_let!( - Ok(Ok(RoomSendQueueUpdate::UploadedMedia { + Ok(Ok(RoomSendQueueUpdate::MediaUpload { related_to, file, + .. })) = timeout(Duration::from_secs(1), $watch.recv()).await ); - assert_matches!($global_watch.recv().await, Ok(SendQueueUpdate { update: RoomSendQueueUpdate::UploadedMedia { .. }, .. })); + assert_matches!($global_watch.recv().await, Ok(SendQueueUpdate { update: RoomSendQueueUpdate::MediaUpload { .. }, .. })); assert_eq!(related_to, $related_to); - assert_let!(MediaSource::Plain(mxc) = file); + assert_let!(Some(MediaSource::Plain(mxc)) = file); assert_eq!(mxc, $mxc); }}; + // Check the next stream events communicate upload progress and finally the uploaded media. + (($global_watch:ident, $watch:ident) => uploaded_with_progress { + related_to = $related_to:expr, + mxc = $mxc:expr, + index = $index:expr, + progress_start = $progress_start:expr, + progress_end = $progress_end:expr, + progress_total = $progress_total:expr + }) => {{ + let mut prev_progress: Option = None; + + loop { + assert_let!( + Ok(Ok(RoomSendQueueUpdate::MediaUpload { + related_to, + file, + index, + progress, .. + })) = timeout(Duration::from_secs(1), $watch.recv()).await + ); + assert_matches!($global_watch.recv().await, Ok(SendQueueUpdate { update: RoomSendQueueUpdate::MediaUpload { .. }, .. })); + + + assert_eq!(related_to, $related_to); + assert_eq!(index, $index); + + if let Some(progress_start) = $progress_start { + assert!(progress.current >= progress_start); + } + assert!(progress.current <= $progress_end); + assert_eq!(progress.total, $progress_total); + if let Some(prev_progress) = prev_progress { + assert!(progress.current >= prev_progress.current); + } + prev_progress = Some(progress); + + if let Some(MediaSource::Plain(mxc)) = file { + assert_eq!(progress.current, $progress_end); + assert_eq!(mxc, $mxc); + break; + } + } + }}; + // Check the next stream event is a local echo for a reaction with the content $key which // applies to the local echo with transaction id $parent. (($global_watch:ident, $watch:ident) => local reaction { key = $key:expr, parent = $parent_txn_id:expr }) => {{ @@ -1838,6 +1882,7 @@ async fn test_media_uploads() { // Mark the room as joined. let room_id = room_id!("!a:b.c"); let client = mock.client_builder().build().await; + client.send_queue().enable_upload_progress(true); let room = mock.sync_joined_room(&client, room_id).await; let q = room.send_queue(); @@ -1869,6 +1914,9 @@ async fn test_media_uploads() { ..Default::default() }); + let size_data = data.len(); + let size_thumbnail = thumbnail.data.len(); + let transaction_id = TransactionId::new(); let mentions = Mentions::with_user_ids([owned_user_id!("@ivan:sdk.rs")]); let config = AttachmentConfig::new() @@ -2028,14 +2076,22 @@ async fn test_media_uploads() { assert!(watch.is_empty()); drop(block_upload); - assert_update!((global_watch, watch) => uploaded { + assert_update!((global_watch, watch) => uploaded_with_progress { related_to = transaction_id, - mxc = mxc_uri!("mxc://sdk.rs/thumbnail") + mxc = mxc_uri!("mxc://sdk.rs/thumbnail"), + index = 0, + progress_start = None, + progress_end = size_thumbnail, + progress_total = size_data + size_thumbnail }); - assert_update!((global_watch, watch) => uploaded { + assert_update!((global_watch, watch) => uploaded_with_progress { related_to = transaction_id, - mxc = mxc_uri!("mxc://sdk.rs/media") + mxc = mxc_uri!("mxc://sdk.rs/media"), + index = 0, + progress_start = Some(size_thumbnail), + progress_end = size_data + size_thumbnail, + progress_total = size_data + size_thumbnail }); let edit_msg = assert_update!((global_watch, watch) => edit local echo { diff --git a/testing/matrix-sdk-integration-testing/src/tests/timeline.rs b/testing/matrix-sdk-integration-testing/src/tests/timeline.rs index 24e55e6eb8f..57aae6f458c 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/timeline.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/timeline.rs @@ -282,7 +282,7 @@ async fn test_stale_local_echo_time_abort_edit() { } assert!(local_echo.is_editable()); - assert_matches!(local_echo.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(local_echo.send_state(), Some(EventSendState::NotSentYet { progress: None })); assert_eq!(local_echo.content().as_message().unwrap().body(), "hi!"); let mut has_sender_profile = local_echo.sender_profile().is_ready();