Skip to content

Commit 91e0c76

Browse files
committed
feat(sdk): Introduce SendQueue::subscribe.
To get updates from the `SendQueue`, one needs to use `RoomSendQueue::subscribe`, it emits `RoomSendQueueUpdate`s. However, if one wants to receive updates for all rooms managed by the `SendQueue`, instead of subscribing to all `RoomSendQueue` individually, this patch introduces a new `SendQueue::subscribe` method, which emits `SendQueueUpdate`s. It's basically a wrapper around `RoomSendQueueUpdate` by adding an `OwnedRoomId`.
1 parent 7fac1d2 commit 91e0c76

File tree

2 files changed

+126
-45
lines changed

2 files changed

+126
-45
lines changed

crates/matrix-sdk/src/send_queue/mod.rs

Lines changed: 124 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ use ruma::{
164164
AnyMessageLikeEventContent, EventContent as _, Mentions,
165165
},
166166
serde::Raw,
167-
MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId,
167+
MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId,
168+
TransactionId,
168169
};
169170
use tokio::sync::{broadcast, oneshot, Mutex, Notify, OwnedMutexGuard};
170171
use tracing::{debug, error, info, instrument, trace, warn};
@@ -242,7 +243,8 @@ impl SendQueue {
242243
let owned_room_id = room_id.to_owned();
243244
let room_q = RoomSendQueue::new(
244245
self.is_enabled(),
245-
data.error_reporter.clone(),
246+
data.global_update_sender.clone(),
247+
data.error_sender.clone(),
246248
data.is_dropping.clone(),
247249
&self.client,
248250
owned_room_id.clone(),
@@ -283,10 +285,18 @@ impl SendQueue {
283285
self.data().globally_enabled.load(Ordering::SeqCst)
284286
}
285287

288+
/// Subscribe to all updates for all rooms.
289+
///
290+
/// Use [`RoomSendQueue::subscribe`] to subscribe to update for a _specific
291+
/// room_.
292+
pub fn subscribe(&self) -> broadcast::Receiver<SendQueueUpdate> {
293+
self.data().global_update_sender.subscribe()
294+
}
295+
286296
/// A subscriber to the enablement status (enabled or disabled) of the
287297
/// send queue, along with useful errors.
288298
pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
289-
self.data().error_reporter.subscribe()
299+
self.data().error_sender.subscribe()
290300
}
291301
}
292302

@@ -325,8 +335,13 @@ pub(super) struct SendQueueData {
325335
/// initial enablement state.
326336
globally_enabled: AtomicBool,
327337

338+
/// Global sender to send [`SendQueueUpdate`].
339+
///
340+
/// See [`SendQueue::subscribe`].
341+
global_update_sender: broadcast::Sender<SendQueueUpdate>,
342+
328343
/// Global error updates for the send queue.
329-
error_reporter: broadcast::Sender<SendQueueRoomError>,
344+
error_sender: broadcast::Sender<SendQueueRoomError>,
330345

331346
/// Are we currently dropping the Client?
332347
is_dropping: Arc<AtomicBool>,
@@ -335,12 +350,14 @@ pub(super) struct SendQueueData {
335350
impl SendQueueData {
336351
/// Create the data for a send queue, in the given enabled state.
337352
pub fn new(globally_enabled: bool) -> Self {
338-
let (sender, _) = broadcast::channel(32);
353+
let (global_update_sender, _) = broadcast::channel(32);
354+
let (error_sender, _) = broadcast::channel(32);
339355

340356
Self {
341357
rooms: Default::default(),
342358
globally_enabled: AtomicBool::new(globally_enabled),
343-
error_reporter: sender,
359+
global_update_sender,
360+
error_sender,
344361
is_dropping: Arc::new(false.into()),
345362
}
346363
}
@@ -385,12 +402,13 @@ impl std::fmt::Debug for RoomSendQueue {
385402
impl RoomSendQueue {
386403
fn new(
387404
globally_enabled: bool,
388-
global_error_reporter: broadcast::Sender<SendQueueRoomError>,
405+
global_update_sender: broadcast::Sender<SendQueueUpdate>,
406+
global_error_sender: broadcast::Sender<SendQueueRoomError>,
389407
is_dropping: Arc<AtomicBool>,
390408
client: &Client,
391409
room_id: OwnedRoomId,
392410
) -> Self {
393-
let (updates_sender, _) = broadcast::channel(32);
411+
let (update_sender, _) = broadcast::channel(32);
394412

395413
let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone());
396414
let notifier = Arc::new(Notify::new());
@@ -402,16 +420,18 @@ impl RoomSendQueue {
402420
weak_room.clone(),
403421
queue.clone(),
404422
notifier.clone(),
405-
updates_sender.clone(),
423+
global_update_sender.clone(),
424+
update_sender.clone(),
406425
locally_enabled.clone(),
407-
global_error_reporter,
426+
global_error_sender,
408427
is_dropping,
409428
));
410429

411430
Self {
412431
inner: Arc::new(RoomSendQueueInner {
413432
room: weak_room,
414-
updates: updates_sender,
433+
global_update_sender,
434+
update_sender,
415435
_task: task,
416436
queue,
417437
notifier,
@@ -461,7 +481,7 @@ impl RoomSendQueue {
461481
created_at,
462482
};
463483

464-
let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
484+
self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
465485
transaction_id,
466486
content: LocalEchoContent::Event {
467487
serialized_event: content,
@@ -500,32 +520,50 @@ impl RoomSendQueue {
500520

501521
/// Returns the current local requests as well as a receiver to listen to
502522
/// the send queue updates, as defined in [`RoomSendQueueUpdate`].
523+
///
524+
/// Use [`SendQueue::subscribe`] to subscribe to update for _all rooms_ with
525+
/// a single receiver.
503526
pub async fn subscribe(
504527
&self,
505528
) -> Result<(Vec<LocalEcho>, broadcast::Receiver<RoomSendQueueUpdate>), RoomSendQueueError>
506529
{
507530
let local_echoes = self.inner.queue.local_echoes(self).await?;
508531

509-
Ok((local_echoes, self.inner.updates.subscribe()))
532+
Ok((local_echoes, self.inner.update_sender.subscribe()))
510533
}
511534

512535
/// A task that must be spawned in the async runtime, running in the
513536
/// background for each room that has a send queue.
514537
///
515538
/// It only progresses forward: nothing can be cancelled at any point, which
516539
/// makes the implementation not overly complicated to follow.
540+
#[allow(clippy::too_many_arguments)]
517541
#[instrument(skip_all, fields(room_id = %room.room_id()))]
518542
async fn sending_task(
519543
room: WeakRoom,
520544
queue: QueueStorage,
521545
notifier: Arc<Notify>,
522-
updates: broadcast::Sender<RoomSendQueueUpdate>,
546+
global_update_sender: broadcast::Sender<SendQueueUpdate>,
547+
update_sender: broadcast::Sender<RoomSendQueueUpdate>,
523548
locally_enabled: Arc<AtomicBool>,
524-
global_error_reporter: broadcast::Sender<SendQueueRoomError>,
549+
global_error_sender: broadcast::Sender<SendQueueRoomError>,
525550
is_dropping: Arc<AtomicBool>,
526551
) {
527552
trace!("spawned the sending task");
528553

554+
fn send_update(
555+
global_update_sender: &broadcast::Sender<SendQueueUpdate>,
556+
update_sender: &broadcast::Sender<RoomSendQueueUpdate>,
557+
room_id: &RoomId,
558+
update: RoomSendQueueUpdate,
559+
) {
560+
let _ = update_sender.send(update.clone());
561+
let _ =
562+
global_update_sender.send(SendQueueUpdate { room_id: room_id.to_owned(), update });
563+
}
564+
565+
let room_id = room.room_id();
566+
529567
loop {
530568
// A request to shut down should be preferred above everything else.
531569
if is_dropping.load(Ordering::SeqCst) {
@@ -541,7 +579,7 @@ impl RoomSendQueue {
541579
}
542580

543581
for up in new_updates {
544-
let _ = updates.send(up);
582+
send_update(&global_update_sender, &update_sender, room_id, up);
545583
}
546584

547585
if !locally_enabled.load(Ordering::SeqCst) {
@@ -585,17 +623,24 @@ impl RoomSendQueue {
585623
{
586624
Ok(()) => match parent_key {
587625
SentRequestKey::Event(event_id) => {
588-
let _ = updates.send(RoomSendQueueUpdate::SentEvent {
589-
transaction_id: txn_id,
590-
event_id,
591-
});
626+
send_update(
627+
&global_update_sender,
628+
&update_sender,
629+
room_id,
630+
RoomSendQueueUpdate::SentEvent { transaction_id: txn_id, event_id },
631+
);
592632
}
593633

594634
SentRequestKey::Media(media_info) => {
595-
let _ = updates.send(RoomSendQueueUpdate::UploadedMedia {
596-
related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
597-
file: media_info.file,
598-
});
635+
send_update(
636+
&global_update_sender,
637+
&update_sender,
638+
room_id,
639+
RoomSendQueueUpdate::UploadedMedia {
640+
related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
641+
file: media_info.file,
642+
},
643+
);
599644
}
600645
},
601646

@@ -656,17 +701,22 @@ impl RoomSendQueue {
656701

657702
let error = Arc::new(err);
658703

659-
let _ = global_error_reporter.send(SendQueueRoomError {
660-
room_id: room.room_id().to_owned(),
704+
let _ = global_error_sender.send(SendQueueRoomError {
705+
room_id: room_id.to_owned(),
661706
error: error.clone(),
662707
is_recoverable,
663708
});
664709

665-
let _ = updates.send(RoomSendQueueUpdate::SendError {
666-
transaction_id: related_txn_id.unwrap_or(txn_id),
667-
error,
668-
is_recoverable,
669-
});
710+
send_update(
711+
&global_update_sender,
712+
&update_sender,
713+
room_id,
714+
RoomSendQueueUpdate::SendError {
715+
transaction_id: related_txn_id.unwrap_or(txn_id),
716+
error,
717+
is_recoverable,
718+
},
719+
);
670720
}
671721
}
672722
}
@@ -804,6 +854,17 @@ impl RoomSendQueue {
804854
self.inner.notifier.notify_one();
805855
}
806856
}
857+
858+
/// Send an update on the room send queue channel, and on the global send
859+
/// queue channel, i.e. it sends a [`RoomSendQueueUpdate`] and a
860+
/// [`SendQueueUpdate`].
861+
fn send_update(&self, update: RoomSendQueueUpdate) {
862+
let _ = self.inner.update_sender.send(update.clone());
863+
let _ = self
864+
.inner
865+
.global_update_sender
866+
.send(SendQueueUpdate { room_id: self.inner.room.room_id().to_owned(), update });
867+
}
807868
}
808869

809870
impl From<&crate::Error> for QueueWedgeError {
@@ -840,10 +901,17 @@ struct RoomSendQueueInner {
840901
/// The room which this send queue relates to.
841902
room: WeakRoom,
842903

904+
/// Global sender to send [`SendQueueUpdate`].
905+
///
906+
/// See [`SendQueue::subscribe`].
907+
global_update_sender: broadcast::Sender<SendQueueUpdate>,
908+
843909
/// Broadcaster for notifications about the statuses of requests to be sent.
844910
///
845911
/// Can be subscribed to from the outside.
846-
updates: broadcast::Sender<RoomSendQueueUpdate>,
912+
///
913+
/// See [`RoomSendQueue::subscribe`].
914+
update_sender: broadcast::Sender<RoomSendQueueUpdate>,
847915

848916
/// Queue of requests that are either to be sent, or being sent.
849917
///
@@ -2078,6 +2146,19 @@ pub enum RoomSendQueueUpdate {
20782146
},
20792147
}
20802148

2149+
/// A [`RoomSendQueueUpdate`] with an associated [`OwnedRoomId`].
2150+
///
2151+
/// This is used by [`SendQueue::subscribe`] to get a single channel to receive
2152+
/// updates for all [`RoomSendQueue`]s.
2153+
#[derive(Clone, Debug)]
2154+
pub struct SendQueueUpdate {
2155+
/// The room where the update happened.
2156+
pub room_id: OwnedRoomId,
2157+
2158+
/// The update for this room.
2159+
pub update: RoomSendQueueUpdate,
2160+
}
2161+
20812162
/// An error triggered by the send queue module.
20822163
#[derive(Debug, thiserror::Error)]
20832164
pub enum RoomSendQueueError {
@@ -2200,7 +2281,7 @@ impl SendHandle {
22002281
for handles in &self.media_handles {
22012282
if queue.abort_upload(&self.transaction_id, handles).await? {
22022283
// Propagate a cancelled update.
2203-
let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
2284+
self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
22042285
transaction_id: self.transaction_id.clone(),
22052286
});
22062287

@@ -2216,7 +2297,7 @@ impl SendHandle {
22162297
trace!("successful abort");
22172298

22182299
// Propagate a cancelled update too.
2219-
let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
2300+
self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
22202301
transaction_id: self.transaction_id.clone(),
22212302
});
22222303

@@ -2249,7 +2330,7 @@ impl SendHandle {
22492330
self.room.inner.notifier.notify_one();
22502331

22512332
// Propagate a replaced update too.
2252-
let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent {
2333+
self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
22532334
transaction_id: self.transaction_id.clone(),
22542335
new_content: serializable,
22552336
});
@@ -2302,7 +2383,7 @@ impl SendHandle {
23022383
.map_err(RoomSendQueueStorageError::JsonSerialization)?;
23032384

23042385
// Propagate a replaced update too.
2305-
let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent {
2386+
self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
23062387
transaction_id: self.transaction_id.clone(),
23072388
new_content,
23082389
});
@@ -2344,9 +2425,9 @@ impl SendHandle {
23442425
// Wake up the queue, in case the room was asleep before unwedging the request.
23452426
room.notifier.notify_one();
23462427

2347-
let _ = room
2348-
.updates
2349-
.send(RoomSendQueueUpdate::RetryEvent { transaction_id: self.transaction_id.clone() });
2428+
self.room.send_update(RoomSendQueueUpdate::RetryEvent {
2429+
transaction_id: self.transaction_id.clone(),
2430+
});
23502431

23512432
Ok(())
23522433
}
@@ -2377,9 +2458,9 @@ impl SendHandle {
23772458
transaction_id: reaction_txn_id.clone(),
23782459
};
23792460

2380-
let _ = self.room.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2381-
// Note: we do want to use the txn_id we're going to use for the reaction, not the
2382-
// one for the event we're reacting to.
2461+
self.room.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2462+
// Note: we do want to use the `txn_id` we're going to use for the reaction, not
2463+
// the one for the event we're reacting to.
23832464
transaction_id: reaction_txn_id.into(),
23842465
content: LocalEchoContent::React {
23852466
key,
@@ -2415,7 +2496,7 @@ impl SendReactionHandle {
24152496
// Simple case: the reaction was found in the dependent event list.
24162497

24172498
// Propagate a cancelled update too.
2418-
let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
2499+
self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
24192500
transaction_id: self.transaction_id.clone().into(),
24202501
});
24212502

crates/matrix-sdk/src/send_queue/upload.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ impl RoomSendQueue {
267267
created_at,
268268
};
269269

270-
let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
270+
self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
271271
transaction_id: send_event_txn.clone().into(),
272272
content: LocalEchoContent::Event {
273273
serialized_event: SerializableEventContent::new(&event_content.into())
@@ -400,7 +400,7 @@ impl RoomSendQueue {
400400
created_at,
401401
};
402402

403-
let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
403+
self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
404404
transaction_id: send_event_txn.clone().into(),
405405
content: LocalEchoContent::Event {
406406
serialized_event: SerializableEventContent::new(&event_content.into())

0 commit comments

Comments
 (0)