Skip to content

Commit 754d946

Browse files
committed
feat(sdk): Add a tasks that listens for historic room keys if they arrive out of order
Historic room key bundles are uploaded as an encrypted file to the media repo and the key to decrypt the file is sent as a to-device message to the recipient device. In the nominal case, the invite and this to-device message should arrive at the same time and accepting the invite would download and import the bundle. If the to-device message arrives after the invite has already been accepted we would never download and import the bundle. To mitigate this problem, this patch introduces a task that listens for bundles that arrive. If the bundle is for a room that we have joined we will consider importing the bundle.
1 parent 59c2980 commit 754d946

File tree

7 files changed

+94
-15
lines changed

7 files changed

+94
-15
lines changed

crates/matrix-sdk/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ All notable changes to this project will be documented in this file.
1414

1515
### Features
1616

17+
- Add support to accept historic room key bundles that arrive out of order, i.e.
18+
the bundle arrives after the invite has already been accepted.
19+
([#5322](https://github.com/matrix-org/matrix-rust-sdk/pull/5322))
1720
- `Client::add_event_handler`: Set `Option<EncryptionInfo>` in `EventHandlerData` for to-device messages.
1821
If the to-device message was encrypted, the `EncryptionInfo` will be set. If it is `None` the message was sent in clear.
1922
([#5099](https://github.com/matrix-org/matrix-rust-sdk/pull/5099))

crates/matrix-sdk/src/authentication/matrix/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -802,7 +802,7 @@ impl MatrixAuth {
802802
_ => None,
803803
};
804804

805-
self.client.encryption().spawn_initialization_task(auth_data);
805+
self.client.encryption().spawn_initialization_task(auth_data).await;
806806
}
807807

808808
Ok(())

crates/matrix-sdk/src/authentication/oauth/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -818,7 +818,7 @@ impl OAuth {
818818
}
819819

820820
#[cfg(feature = "e2e-encryption")]
821-
self.client.encryption().spawn_initialization_task(None);
821+
self.client.encryption().spawn_initialization_task(None).await;
822822

823823
Ok(())
824824
}
@@ -1019,7 +1019,7 @@ impl OAuth {
10191019
self.enable_cross_process_lock().await.map_err(OAuthError::from)?;
10201020

10211021
#[cfg(feature = "e2e-encryption")]
1022-
self.client.encryption().spawn_initialization_task(None);
1022+
self.client.encryption().spawn_initialization_task(None).await;
10231023
}
10241024

10251025
Ok(())

crates/matrix-sdk/src/authentication/oauth/qrcode/login.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ impl<'a> IntoFuture for LoginWithQrCode<'a> {
253253
// ourselves see us as verified and the recovery/backup states will
254254
// be known. If we did receive all the secrets in the secrets
255255
// bundle, then backups will be enabled after this step as well.
256-
self.client.encryption().spawn_initialization_task(None);
256+
self.client.encryption().spawn_initialization_task(None).await;
257257
self.client.encryption().wait_for_e2ee_initialization_tasks().await;
258258

259259
trace!("successfully logged in and enabled E2EE.");

crates/matrix-sdk/src/client/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ impl ClientInner {
415415
let client = Arc::new(client);
416416

417417
#[cfg(feature = "e2e-encryption")]
418-
client.e2ee.initialize_room_key_tasks(&client);
418+
client.e2ee.initialize_tasks(&client);
419419

420420
let _ = client
421421
.event_cache

crates/matrix-sdk/src/encryption/mod.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ use ruma::{
6262
#[cfg(feature = "experimental-send-custom-to-device")]
6363
use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
6464
use serde::Deserialize;
65+
use tasks::BundleReceiverTask;
6566
use tokio::sync::{Mutex, RwLockReadGuard};
6667
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
6768
use tracing::{debug, error, instrument, trace, warn};
@@ -134,7 +135,7 @@ impl EncryptionData {
134135
}
135136
}
136137

137-
pub fn initialize_room_key_tasks(&self, client: &Arc<ClientInner>) {
138+
pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
138139
let weak_client = WeakClient::from_inner(client);
139140

140141
let mut tasks = self.tasks.lock();
@@ -1685,10 +1686,13 @@ impl Encryption {
16851686
/// there is a proposal (MSC3967) to remove this requirement, which would
16861687
/// allow for the initial upload of cross-signing keys without
16871688
/// authentication, rendering this parameter obsolete.
1688-
pub(crate) fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1689+
pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1690+
let bundle_receiver_task = BundleReceiverTask::new(&self.client).await;
1691+
16891692
let mut tasks = self.client.inner.e2ee.tasks.lock();
16901693

16911694
let this = self.clone();
1695+
16921696
tasks.setup_e2ee = Some(spawn(async move {
16931697
// Update the current state first, so we don't have to wait for the result of
16941698
// network requests
@@ -1707,6 +1711,11 @@ impl Encryption {
17071711
error!("Couldn't setup and resume recovery {e:?}");
17081712
}
17091713
}));
1714+
1715+
// It's fine to be async here as we're only getting the lock protecting the
1716+
// `OlmMachine`. Since the lock shouldn't be that contested right after logging
1717+
// in we won't delay the login or restoration of the Client.
1718+
tasks.receive_historic_room_key_bundles = Some(bundle_receiver_task);
17101719
}
17111720

17121721
/// Waits for end-to-end encryption initialization tasks to finish, if any

crates/matrix-sdk/src/encryption/tasks.rs

Lines changed: 75 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,24 @@
1414

1515
use std::{collections::BTreeMap, sync::Arc, time::Duration};
1616

17+
use futures_core::Stream;
18+
use futures_util::{pin_mut, StreamExt};
19+
use matrix_sdk_base::{crypto::store::types::RoomKeyBundleInfo, RoomState};
1720
use matrix_sdk_common::failures_cache::FailuresCache;
1821
use ruma::{
1922
events::room::encrypted::{EncryptedEventScheme, OriginalSyncRoomEncryptedEvent},
2023
serde::Raw,
2124
OwnedEventId, OwnedRoomId,
2225
};
23-
use tokio::sync::{
24-
mpsc::{self, UnboundedReceiver},
25-
Mutex,
26-
};
27-
use tracing::{debug, trace, warn};
26+
use tokio::sync::{mpsc, Mutex};
27+
use tracing::{debug, info, instrument, trace, warn};
2828

2929
use crate::{
3030
client::WeakClient,
3131
encryption::backups::UploadState,
3232
executor::{spawn, JoinHandle},
33-
Client,
33+
room::shared_room_history,
34+
Client, Room,
3435
};
3536

3637
/// A cache of room keys we already downloaded.
@@ -41,6 +42,7 @@ pub(crate) struct ClientTasks {
4142
pub(crate) upload_room_keys: Option<BackupUploadingTask>,
4243
pub(crate) download_room_keys: Option<BackupDownloadTask>,
4344
pub(crate) update_recovery_state_after_backup: Option<JoinHandle<()>>,
45+
pub(crate) receive_historic_room_key_bundles: Option<BundleReceiverTask>,
4446
pub(crate) setup_e2ee: Option<JoinHandle<()>>,
4547
}
4648

@@ -72,7 +74,7 @@ impl BackupUploadingTask {
7274
let _ = self.sender.send(());
7375
}
7476

75-
pub(crate) async fn listen(client: WeakClient, mut receiver: UnboundedReceiver<()>) {
77+
pub(crate) async fn listen(client: WeakClient, mut receiver: mpsc::UnboundedReceiver<()>) {
7678
while receiver.recv().await.is_some() {
7779
if let Some(client) = client.get() {
7880
let upload_progress = &client.inner.e2ee.backup_state.upload_progress;
@@ -176,7 +178,10 @@ impl BackupDownloadTask {
176178
/// # Arguments
177179
///
178180
/// * `receiver` - The source of incoming [`RoomKeyDownloadRequest`]s.
179-
async fn listen(client: WeakClient, mut receiver: UnboundedReceiver<RoomKeyDownloadRequest>) {
181+
async fn listen(
182+
client: WeakClient,
183+
mut receiver: mpsc::UnboundedReceiver<RoomKeyDownloadRequest>,
184+
) {
180185
let state = Arc::new(Mutex::new(BackupDownloadTaskListenerState::new(client)));
181186

182187
while let Some(room_key_download_request) = receiver.recv().await {
@@ -385,6 +390,68 @@ impl BackupDownloadTaskListenerState {
385390
}
386391
}
387392

393+
pub(crate) struct BundleReceiverTask {
394+
_handle: JoinHandle<()>,
395+
}
396+
397+
impl BundleReceiverTask {
398+
pub async fn new(client: &Client) -> Self {
399+
let stream = client.encryption().historic_room_key_stream().await.expect("E2EE tasks should only be initialized once we have logged in and have access to an OlmMachine");
400+
let weak_client = WeakClient::from_client(client);
401+
let handle = spawn(Self::listen_task(weak_client, stream));
402+
403+
Self { _handle: handle }
404+
}
405+
406+
async fn listen_task(client: WeakClient, stream: impl Stream<Item = RoomKeyBundleInfo>) {
407+
pin_mut!(stream);
408+
409+
// TODO: Listening to this stream is not enough for iOS due to the NSE killing
410+
// our OlmMachine and thus also this stream. We need to add an event handler
411+
// that will listen for the bundle event. To be able to add an event handler,
412+
// we'll have to implement the bundle event in Ruma.
413+
while let Some(bundle_info) = stream.next().await {
414+
let Some(client) = client.get() else {
415+
// The client was dropped while we were waiting on the stream. Let's end the
416+
// loop, since this means that the application has shut down.
417+
break;
418+
};
419+
420+
let Some(room) = client.get_room(&bundle_info.room_id) else {
421+
warn!(room_id = %bundle_info.room_id, "Received a historic room key bundle for an unknown room");
422+
continue;
423+
};
424+
425+
Self::handle_bundle(&room, &bundle_info).await;
426+
}
427+
}
428+
429+
#[instrument(skip(room), fields(room_id = %room.room_id()))]
430+
async fn handle_bundle(room: &Room, bundle_info: &RoomKeyBundleInfo) {
431+
if Self::should_accept_bundle(room, bundle_info) {
432+
info!("Accepting an out of order key bundle.");
433+
434+
if let Err(e) =
435+
shared_room_history::maybe_accept_key_bundle(room, &bundle_info.sender).await
436+
{
437+
warn!("Couldn't accept a late room key bundle {e:?}");
438+
}
439+
} else {
440+
info!("Refusing to accept a historic room key bundle.");
441+
}
442+
}
443+
444+
fn should_accept_bundle(room: &Room, _bundle_info: &RoomKeyBundleInfo) -> bool {
445+
// TODO: Check that the person that invited us to this room is the same as the
446+
// sender, of the bundle. Otherwise don't ignore the bundle.
447+
// TODO: Check that we joined the room "recently". (How do you do this if you
448+
// accept the invite on another client? I guess we remember when the transition
449+
// from Invited to Joined happened, but can't the server force us into a joined
450+
// state if we do this?
451+
room.state() == RoomState::Joined
452+
}
453+
}
454+
388455
#[cfg(all(test, not(target_family = "wasm")))]
389456
mod test {
390457
use matrix_sdk_test::async_test;

0 commit comments

Comments
 (0)