Skip to content

Commit e4e2796

Browse files
committed
feat(send_queue): communicate media upload progress in RoomSendQueueUpdate::MediaUpload
Signed-off-by: Johannes Marbach <n0-0ne+github@mailbox.org>
1 parent 5c0bb6a commit e4e2796

File tree

3 files changed

+336
-13
lines changed

3 files changed

+336
-13
lines changed

crates/matrix-sdk/src/send_queue/mod.rs

Lines changed: 270 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ use crate::{
177177
config::RequestConfig,
178178
error::RetryKind,
179179
room::{edit::EditedContent, WeakRoom},
180-
Client, Media, Room, TransmissionProgress,
180+
AbstractProgress, Client, Media, Room, TransmissionProgress,
181181
};
182182

183183
mod upload;
@@ -549,7 +549,7 @@ impl RoomSendQueue {
549549
locally_enabled: Arc<AtomicBool>,
550550
global_error_reporter: broadcast::Sender<SendQueueRoomError>,
551551
is_dropping: Arc<AtomicBool>,
552-
_report_media_upload_progress: Arc<AtomicBool>,
552+
report_media_upload_progress: Arc<AtomicBool>,
553553
) {
554554
trace!("spawned the sending task");
555555

@@ -607,7 +607,21 @@ impl RoomSendQueue {
607607
continue;
608608
};
609609

610-
match Self::handle_request(&room, queued_request, cancel_upload_rx, None).await {
610+
// Prepare to watch and communicate the request progress for media uploads.
611+
let media_upload_info =
612+
RoomSendQueue::try_create_media_upload_info(&queued_request, &room, &queue)
613+
.await
614+
.unwrap_or_default();
615+
let progress = RoomSendQueue::try_create_media_upload_progress_observable(
616+
&report_media_upload_progress,
617+
&media_upload_info,
618+
&related_txn_id,
619+
&updates,
620+
);
621+
622+
match Self::handle_request(&room, queued_request, cancel_upload_rx, progress.clone())
623+
.await
624+
{
611625
Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await
612626
{
613627
Ok(()) => match parent_key {
@@ -621,7 +635,17 @@ impl RoomSendQueue {
621635
SentRequestKey::Media(media_info) => {
622636
let _ = updates.send(RoomSendQueueUpdate::MediaUpload {
623637
related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
624-
file: media_info.file,
638+
file: Some(media_info.file),
639+
index: media_upload_info.index,
640+
progress: estimate_combined_media_upload_progress(
641+
// The file finished uploading
642+
AbstractProgress {
643+
current: media_upload_info.bytes,
644+
total: media_upload_info.bytes,
645+
},
646+
media_upload_info.uploaded,
647+
media_upload_info.pending,
648+
),
625649
});
626650
}
627651
},
@@ -701,6 +725,162 @@ impl RoomSendQueue {
701725
info!("exited sending task");
702726
}
703727

728+
/// Try to create metadata required to compute the progress of a media
729+
/// upload.
730+
async fn try_create_media_upload_info(
731+
queued_request: &QueuedRequest,
732+
room: &Room,
733+
queue: &QueueStorage,
734+
) -> Option<MediaUploadInfo> {
735+
let QueuedRequestKind::MediaUpload {
736+
cache_key,
737+
thumbnail_source,
738+
related_to,
739+
#[cfg(feature = "unstable-msc4274")]
740+
accumulated,
741+
..
742+
} = &queued_request.kind
743+
else {
744+
return None;
745+
};
746+
747+
// Determine the item's index, if this is a gallery upload.
748+
let index = {
749+
cfg_if::cfg_if! {
750+
if #[cfg(feature = "unstable-msc4274")] {
751+
accumulated.len()
752+
} else {
753+
0 // Before MSC4274 only a single file (and thumbnail) could be sent per event.
754+
}
755+
}
756+
};
757+
758+
// Get the size of the file being uploaded from the event cache.
759+
let bytes = if let Ok(cache) = room.client().event_cache_store().lock().await {
760+
if let Ok(Some(content)) = cache.get_media_content(cache_key).await {
761+
content.len()
762+
} else {
763+
0
764+
}
765+
} else {
766+
0
767+
};
768+
769+
// If this is a file upload, get the size of any previously uploaded thumbnail
770+
// from the in-memory media sizes cache.
771+
let uploaded = if thumbnail_source.is_some() {
772+
if let Some(sizes) = queue.thumbnail_size_cache.lock().await.get(related_to) {
773+
sizes.get(index).copied().flatten().unwrap_or(0)
774+
} else {
775+
0
776+
}
777+
} else {
778+
0
779+
};
780+
781+
// If this is a thumbnail upload, get the size of the pending file upload from
782+
// the dependent requests.
783+
let pending = RoomSendQueue::get_dependent_pending_file_upload_size(
784+
queued_request.transaction_id.clone(),
785+
queue,
786+
room,
787+
)
788+
.await;
789+
790+
Some(MediaUploadInfo { index: index as u64, bytes, uploaded, pending })
791+
}
792+
793+
/// Determine the size of a pending file upload, if this is a thumbnail
794+
/// upload or return 0 otherwise.
795+
async fn get_dependent_pending_file_upload_size(
796+
txn_id: OwnedTransactionId,
797+
queue: &QueueStorage,
798+
room: &Room,
799+
) -> usize {
800+
let guard = queue.store.lock().await;
801+
802+
let Ok(client) = guard.client() else {
803+
return 0;
804+
};
805+
806+
let Ok(dependent_requests) =
807+
client.state_store().load_dependent_queued_requests(room.room_id()).await
808+
else {
809+
return 0;
810+
};
811+
812+
let Some((cache_key, parent_is_thumbnail_upload)) = dependent_requests.iter().find_map(|r| {
813+
if r.parent_transaction_id != txn_id {
814+
return None;
815+
}
816+
as_variant!(&r.kind, DependentQueuedRequestKind::UploadFileOrThumbnail { cache_key, parent_is_thumbnail_upload, .. } => (cache_key.clone(), *parent_is_thumbnail_upload))
817+
}) else {
818+
return 0;
819+
};
820+
821+
// If this is not a thumbnail upload, we're uploading a gallery and the
822+
// dependent request is for the next gallery item.
823+
if !parent_is_thumbnail_upload {
824+
return 0;
825+
}
826+
827+
if let Ok(cache) = room.client().event_cache_store().lock().await {
828+
if let Ok(Some(content)) = cache.get_media_content(&cache_key).await {
829+
content.len()
830+
} else {
831+
0
832+
}
833+
} else {
834+
0
835+
}
836+
}
837+
838+
/// Try to creates an observable to watch a media's upload progress.
839+
fn try_create_media_upload_progress_observable(
840+
report_media_upload_progress: &Arc<AtomicBool>,
841+
media_upload_info: &MediaUploadInfo,
842+
related_txn_id: &Option<OwnedTransactionId>,
843+
updates: &broadcast::Sender<RoomSendQueueUpdate>,
844+
) -> Option<SharedObservable<TransmissionProgress>> {
845+
if !report_media_upload_progress.load(Ordering::SeqCst) {
846+
return None;
847+
}
848+
849+
if let Some(related_txn_id) = related_txn_id {
850+
let progress: SharedObservable<TransmissionProgress> = Default::default();
851+
let mut subscriber = progress.subscribe();
852+
853+
let report_media_upload_progress = Arc::clone(report_media_upload_progress);
854+
let media_upload_info = media_upload_info.clone();
855+
let related_to = related_txn_id.clone();
856+
let updates = updates.clone();
857+
858+
// Watch and communicate the progress on a detached background task. Once
859+
// the progress observable is dropped, next() will return None and the
860+
// task will end.
861+
spawn(async move {
862+
while let Some(progress) = subscriber.next().await {
863+
if report_media_upload_progress.load(Ordering::SeqCst) {
864+
let _ = updates.send(RoomSendQueueUpdate::MediaUpload {
865+
related_to: related_to.clone(),
866+
file: None,
867+
index: media_upload_info.index,
868+
progress: estimate_combined_media_upload_progress(
869+
estimate_media_upload_progress(progress, media_upload_info.bytes),
870+
media_upload_info.uploaded,
871+
media_upload_info.pending,
872+
),
873+
});
874+
}
875+
}
876+
});
877+
878+
Some(progress)
879+
} else {
880+
None
881+
}
882+
}
883+
704884
/// Handles a single request and returns the [`SentRequestKey`] on success
705885
/// (unless the request was cancelled, in which case it'll return
706886
/// `None`).
@@ -851,6 +1031,65 @@ impl RoomSendQueue {
8511031
}
8521032
}
8531033

1034+
/// Estimates the upload progress for a single media file (either a thumbnail or
1035+
/// a file).
1036+
///
1037+
/// This proportionally maps the upload progress given in actual bytes sent
1038+
/// (possibly after encryption) into units of the unencrypted file sizes.
1039+
///
1040+
/// # Arguments
1041+
///
1042+
/// * `progress` - The [`TransmissionProgress`] of uploading the file (possibly
1043+
/// after encryption).
1044+
///
1045+
/// * `bytes` - The total number of bytes in the file before encryption.
1046+
fn estimate_media_upload_progress(
1047+
progress: TransmissionProgress,
1048+
bytes: usize,
1049+
) -> AbstractProgress {
1050+
if progress.total == 0 {
1051+
return AbstractProgress { current: 0, total: 0 };
1052+
}
1053+
1054+
// Did the file finish uploading?
1055+
if progress.current == progress.total {
1056+
return AbstractProgress { current: bytes, total: bytes };
1057+
}
1058+
1059+
// The file is still uploading. Use the rule of 3 to proportionally map the
1060+
// progress into units of the original file size.
1061+
AbstractProgress {
1062+
current: (progress.current as f64 / progress.total as f64 * bytes as f64).round() as usize,
1063+
total: bytes,
1064+
}
1065+
}
1066+
1067+
/// Estimate the combined upload progress across a media file and its
1068+
/// thumbnail.
1069+
///
1070+
/// # Arguments
1071+
///
1072+
/// * `progress` - The progress of uploading the current file mapped into units
1073+
/// of the original file size before encryption.
1074+
///
1075+
/// * `uploaded` - If this is a media file upload and an associated thumbnail
1076+
/// was previously uploaded, the number of bytes in the thumbnail before
1077+
/// encryption. Otherwise, zero.
1078+
///
1079+
/// * `pending` - If this is a thumbnail upload, the number of bytes in the
1080+
/// still to be uploaded associated media file before encryption. Otherwise,
1081+
/// zero.
1082+
fn estimate_combined_media_upload_progress(
1083+
progress: AbstractProgress,
1084+
uploaded: usize,
1085+
pending: usize,
1086+
) -> AbstractProgress {
1087+
AbstractProgress {
1088+
current: uploaded + progress.current,
1089+
total: uploaded + progress.total + pending,
1090+
}
1091+
}
1092+
8541093
impl From<&crate::Error> for QueueWedgeError {
8551094
fn from(value: &crate::Error) -> Self {
8561095
match value {
@@ -921,6 +1160,23 @@ struct BeingSentInfo {
9211160
cancel_upload: Option<oneshot::Sender<()>>,
9221161
}
9231162

1163+
/// Information needed to compute the progress of uploading a media and its
1164+
/// associated thumbnail.
1165+
#[derive(Clone, Default)]
1166+
struct MediaUploadInfo {
1167+
/// The index of the uploaded item if this is a gallery upload. Otherwise,
1168+
/// zero.
1169+
index: u64,
1170+
/// The total number of bytes in the file currently being uploaded.
1171+
bytes: usize,
1172+
/// If the current file is not a thumbnail, the total number of bytes in a
1173+
/// previously uploaded thumbnail, if any exists. Otherwise, zero.
1174+
uploaded: usize,
1175+
/// If the current file is a thumbnail, the total number of bytes in the
1176+
/// related media file still to be uploaded. Otherwise, zero.
1177+
pending: usize,
1178+
}
1179+
9241180
impl BeingSentInfo {
9251181
/// Aborts the upload, if a trigger is available.
9261182
///
@@ -2135,8 +2391,16 @@ pub enum RoomSendQueueUpdate {
21352391
/// The media event this uploaded media relates to.
21362392
related_to: OwnedTransactionId,
21372393

2138-
/// The final media source for the file that was just uploaded.
2139-
file: MediaSource,
2394+
/// The final media source for the file if it has finished uploading.
2395+
file: Option<MediaSource>,
2396+
2397+
/// The index of the media within the transaction. A file and its
2398+
/// thumbnail share the same index.
2399+
index: u64,
2400+
2401+
/// The combined upload progress across the file and, if existing, its
2402+
/// thumbnail.
2403+
progress: AbstractProgress,
21402404
},
21412405
}
21422406

crates/matrix-sdk/src/test_utils/client.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,17 @@ impl MockClientBuilder {
131131
self
132132
}
133133

134+
/// Set whether to report media upload progress via the send queue.
135+
pub fn with_enable_send_queue_media_upload_progress_reporting(
136+
mut self,
137+
enable_send_queue_media_upload_progress_reporting: bool,
138+
) -> Self {
139+
self.builder = self.builder.with_enable_send_queue_media_upload_progress_reporting(
140+
enable_send_queue_media_upload_progress_reporting,
141+
);
142+
self
143+
}
144+
134145
/// Finish building the client into the final [`Client`] instance.
135146
pub async fn build(self) -> Client {
136147
let mut builder = self.builder;

0 commit comments

Comments
 (0)