Skip to content

Commit d9a7da6

Browse files
committed
feat(send_queue): communicate media upload progress in RoomSendQueueUpdate::MediaUpload
1 parent 5c0bb6a commit d9a7da6

File tree

3 files changed

+319
-13
lines changed

3 files changed

+319
-13
lines changed

crates/matrix-sdk/src/send_queue/mod.rs

Lines changed: 253 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ use crate::{
177177
config::RequestConfig,
178178
error::RetryKind,
179179
room::{edit::EditedContent, WeakRoom},
180-
Client, Media, Room, TransmissionProgress,
180+
AbstractProgress, Client, Media, Room, TransmissionProgress,
181181
};
182182

183183
mod upload;
@@ -549,7 +549,7 @@ impl RoomSendQueue {
549549
locally_enabled: Arc<AtomicBool>,
550550
global_error_reporter: broadcast::Sender<SendQueueRoomError>,
551551
is_dropping: Arc<AtomicBool>,
552-
_report_media_upload_progress: Arc<AtomicBool>,
552+
report_media_upload_progress: Arc<AtomicBool>,
553553
) {
554554
trace!("spawned the sending task");
555555

@@ -607,7 +607,21 @@ impl RoomSendQueue {
607607
continue;
608608
};
609609

610-
match Self::handle_request(&room, queued_request, cancel_upload_rx, None).await {
610+
// Prepare to watch and communicate the request progress for media uploads.
611+
let media_upload_info =
612+
RoomSendQueue::try_create_media_upload_info(&queued_request, &room, &queue)
613+
.await
614+
.unwrap_or_default();
615+
let progress = RoomSendQueue::try_create_media_upload_progress_observable(
616+
&report_media_upload_progress,
617+
&media_upload_info,
618+
&related_txn_id,
619+
&updates,
620+
);
621+
622+
match Self::handle_request(&room, queued_request, cancel_upload_rx, progress.clone())
623+
.await
624+
{
611625
Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await
612626
{
613627
Ok(()) => match parent_key {
@@ -621,7 +635,14 @@ impl RoomSendQueue {
621635
SentRequestKey::Media(media_info) => {
622636
let _ = updates.send(RoomSendQueueUpdate::MediaUpload {
623637
related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
624-
file: media_info.file,
638+
file: Some(media_info.file),
639+
index: media_upload_info.index,
640+
progress: compute_combined_progress(
641+
progress.unwrap_or_default().get(),
642+
media_upload_info.bytes,
643+
media_upload_info.uploaded,
644+
media_upload_info.pending,
645+
),
625646
});
626647
}
627648
},
@@ -701,6 +722,163 @@ impl RoomSendQueue {
701722
info!("exited sending task");
702723
}
703724

725+
/// Try to create metadata required to compute the progress of a media
726+
/// upload.
727+
async fn try_create_media_upload_info(
728+
queued_request: &QueuedRequest,
729+
room: &Room,
730+
queue: &QueueStorage,
731+
) -> Option<MediaUploadInfo> {
732+
let QueuedRequestKind::MediaUpload {
733+
cache_key,
734+
thumbnail_source,
735+
related_to,
736+
#[cfg(feature = "unstable-msc4274")]
737+
accumulated,
738+
..
739+
} = &queued_request.kind
740+
else {
741+
return None;
742+
};
743+
744+
// Determine the item's index, if this is a gallery upload.
745+
let index = {
746+
cfg_if::cfg_if! {
747+
if #[cfg(feature = "unstable-msc4274")] {
748+
accumulated.len()
749+
} else {
750+
0 // Before MSC4274 only a single file (and thumbnail) could be sent per event.
751+
}
752+
}
753+
};
754+
755+
// Get the size of the file being uploaded from the event cache.
756+
let bytes = if let Ok(cache) = room.client().event_cache_store().lock().await {
757+
if let Ok(Some(content)) = cache.get_media_content(cache_key).await {
758+
content.len()
759+
} else {
760+
0
761+
}
762+
} else {
763+
0
764+
};
765+
766+
// If this is a file upload, get the size of any previously uploaded thumbnail
767+
// from the in-memory media sizes cache.
768+
let uploaded = if thumbnail_source.is_some() {
769+
if let Some(sizes) = queue.thumbnail_size_cache.lock().await.get(related_to) {
770+
sizes.get(index).copied().flatten().unwrap_or(0)
771+
} else {
772+
0
773+
}
774+
} else {
775+
0
776+
};
777+
778+
// If this is a thumbnail upload, get the size of the pending file upload from
779+
// the dependent requests.
780+
let pending = RoomSendQueue::get_dependent_pending_file_upload_size(
781+
queued_request.transaction_id.clone(),
782+
queue,
783+
room,
784+
)
785+
.await;
786+
787+
Some(MediaUploadInfo { index: index as u64, bytes, uploaded, pending })
788+
}
789+
790+
/// Determine the size of a pending file upload, if this is a thumbnail
791+
/// upload or return 0 otherwise.
792+
async fn get_dependent_pending_file_upload_size(
793+
txn_id: OwnedTransactionId,
794+
queue: &QueueStorage,
795+
room: &Room,
796+
) -> usize {
797+
let guard = queue.store.lock().await;
798+
799+
let Ok(client) = guard.client() else {
800+
return 0;
801+
};
802+
803+
let Ok(dependent_requests) =
804+
client.state_store().load_dependent_queued_requests(room.room_id()).await
805+
else {
806+
return 0;
807+
};
808+
809+
let Some((cache_key, parent_is_thumbnail_upload)) = dependent_requests.iter().find_map(|r| {
810+
if r.parent_transaction_id != txn_id {
811+
return None;
812+
}
813+
as_variant!(&r.kind, DependentQueuedRequestKind::UploadFileOrThumbnail { cache_key, parent_is_thumbnail_upload, .. } => (cache_key.clone(), *parent_is_thumbnail_upload))
814+
}) else {
815+
return 0;
816+
};
817+
818+
// If this is not a thumbnail upload, we're uploading a gallery and the
819+
// dependent request is for the next gallery item.
820+
if !parent_is_thumbnail_upload {
821+
return 0;
822+
}
823+
824+
if let Ok(cache) = room.client().event_cache_store().lock().await {
825+
if let Ok(Some(content)) = cache.get_media_content(&cache_key).await {
826+
content.len()
827+
} else {
828+
0
829+
}
830+
} else {
831+
0
832+
}
833+
}
834+
835+
/// Try to creates an observable to watch a media's upload progress.
836+
fn try_create_media_upload_progress_observable(
837+
report_media_upload_progress: &Arc<AtomicBool>,
838+
media_upload_info: &MediaUploadInfo,
839+
related_txn_id: &Option<OwnedTransactionId>,
840+
updates: &broadcast::Sender<RoomSendQueueUpdate>,
841+
) -> Option<SharedObservable<TransmissionProgress>> {
842+
if !report_media_upload_progress.load(Ordering::SeqCst) {
843+
return None;
844+
}
845+
846+
if let Some(related_txn_id) = related_txn_id {
847+
let progress: SharedObservable<TransmissionProgress> = Default::default();
848+
let mut subscriber = progress.subscribe();
849+
850+
let report_media_upload_progress = Arc::clone(report_media_upload_progress);
851+
let media_upload_info = media_upload_info.clone();
852+
let related_to = related_txn_id.clone();
853+
let updates = updates.clone();
854+
855+
// Watch and communicate the progress on a detached background task. Once
856+
// the progress observable is dropped, next() will return None and the
857+
// task will end.
858+
spawn(async move {
859+
while let Some(progress) = subscriber.next().await {
860+
if report_media_upload_progress.load(Ordering::SeqCst) {
861+
let _ = updates.send(RoomSendQueueUpdate::MediaUpload {
862+
related_to: related_to.clone(),
863+
file: None,
864+
index: media_upload_info.index,
865+
progress: compute_combined_progress(
866+
progress,
867+
media_upload_info.bytes,
868+
media_upload_info.uploaded,
869+
media_upload_info.pending,
870+
),
871+
});
872+
}
873+
}
874+
});
875+
876+
Some(progress)
877+
} else {
878+
None
879+
}
880+
}
881+
704882
/// Handles a single request and returns the [`SentRequestKey`] on success
705883
/// (unless the request was cancelled, in which case it'll return
706884
/// `None`).
@@ -851,6 +1029,50 @@ impl RoomSendQueue {
8511029
}
8521030
}
8531031

1032+
/// Computes the combined upload progress across a media file and its
1033+
/// thumbnail.
1034+
///
1035+
/// # Arguments
1036+
///
1037+
/// * `progress` - The [`TransmissionProgress`] for uploading the current file.
1038+
///
1039+
/// * `bytes` - The total number of bytes in the file being uploaded regardless
1040+
/// of whether it is a media or a thumbnail.
1041+
///
1042+
/// * `uploaded` - If this is a media file upload and an associated thumbnail
1043+
/// was previously uploaded, the number of bytes in the thumbnail. Otherwise,
1044+
/// zero.
1045+
///
1046+
/// * `pending` - If this is a thumbnail upload, the number of bytes in the
1047+
/// associated media file that is still pending to be uploaded. Otherwise,
1048+
/// zero.
1049+
fn compute_combined_progress(
1050+
progress: TransmissionProgress,
1051+
bytes: usize,
1052+
uploaded: usize,
1053+
pending: usize,
1054+
) -> AbstractProgress {
1055+
if progress.total == 0 {
1056+
AbstractProgress { current: 0, total: 0 }
1057+
} else {
1058+
// The uploaded file is possibly encrypted and, thus, larger than
1059+
// the original one. Use the rule of 3 to proportionally map the
1060+
// progress into units of the original file size.
1061+
let mut current =
1062+
(progress.current as f64 / progress.total as f64 * bytes as f64).round() as usize;
1063+
let mut total = bytes;
1064+
1065+
// Add the size of any previously uploaded thumbnail to the current and total.
1066+
current += uploaded;
1067+
total += uploaded;
1068+
1069+
// Add the size of any pending media upload to the total.
1070+
total += pending;
1071+
1072+
AbstractProgress { current, total }
1073+
}
1074+
}
1075+
8541076
impl From<&crate::Error> for QueueWedgeError {
8551077
fn from(value: &crate::Error) -> Self {
8561078
match value {
@@ -921,6 +1143,23 @@ struct BeingSentInfo {
9211143
cancel_upload: Option<oneshot::Sender<()>>,
9221144
}
9231145

1146+
/// Information needed to compute the progress of uploading a media and its
1147+
/// associated thumbnail.
1148+
#[derive(Clone, Default)]
1149+
struct MediaUploadInfo {
1150+
/// The index of the uploaded item if this is a gallery upload. Otherwise,
1151+
/// zero.
1152+
index: u64,
1153+
/// The total number of bytes in the file currently being uploaded.
1154+
bytes: usize,
1155+
/// If the current file is not a thumbnail, the total number of bytes in a
1156+
/// previously uploaded thumbnail, if any exists. Otherwise, zero.
1157+
uploaded: usize,
1158+
/// If the current file is a thumbnail, the total number of bytes in the
1159+
/// related media file still to be uploaded. Otherwise, zero.
1160+
pending: usize,
1161+
}
1162+
9241163
impl BeingSentInfo {
9251164
/// Aborts the upload, if a trigger is available.
9261165
///
@@ -2135,8 +2374,16 @@ pub enum RoomSendQueueUpdate {
21352374
/// The media event this uploaded media relates to.
21362375
related_to: OwnedTransactionId,
21372376

2138-
/// The final media source for the file that was just uploaded.
2139-
file: MediaSource,
2377+
/// The final media source for the file if it has finished uploading.
2378+
file: Option<MediaSource>,
2379+
2380+
/// The index of the media within the transaction. A file and its
2381+
/// thumbnail share the same index.
2382+
index: u64,
2383+
2384+
/// The combined upload progress across the file and, if existing, its
2385+
/// thumbnail.
2386+
progress: AbstractProgress,
21402387
},
21412388
}
21422389

crates/matrix-sdk/src/test_utils/client.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,17 @@ impl MockClientBuilder {
131131
self
132132
}
133133

134+
/// Set whether to report media upload progress via the send queue.
135+
pub fn with_enable_send_queue_media_upload_progress_reporting(
136+
mut self,
137+
enable_send_queue_media_upload_progress_reporting: bool,
138+
) -> Self {
139+
self.builder = self.builder.with_enable_send_queue_media_upload_progress_reporting(
140+
enable_send_queue_media_upload_progress_reporting,
141+
);
142+
self
143+
}
144+
134145
/// Finish building the client into the final [`Client`] instance.
135146
pub async fn build(self) -> Client {
136147
let mut builder = self.builder;

0 commit comments

Comments
 (0)