From 754d94651ea2b7133c6ee290f71d207ea3f55edd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Fri, 13 Jun 2025 12:53:49 +0200 Subject: [PATCH 1/4] feat(sdk): Add a tasks that listens for historic room keys if they arrive out of order Historic room key bundles are uploaded as an encrypted file to the media repo and the key to decrypt the file is sent as a to-device message to the recipient device. In the nominal case, the invite and this to-device message should arrive at the same time and accepting the invite would download and import the bundle. If the to-device message arrives after the invite has already been accepted we would never download and import the bundle. To mitigate this problem, this patch introduces a task that listens for bundles that arrive. If the bundle is for a room that we have joined we will consider importing the bundle. --- crates/matrix-sdk/CHANGELOG.md | 3 + .../src/authentication/matrix/mod.rs | 2 +- .../src/authentication/oauth/mod.rs | 4 +- .../src/authentication/oauth/qrcode/login.rs | 2 +- crates/matrix-sdk/src/client/mod.rs | 2 +- crates/matrix-sdk/src/encryption/mod.rs | 13 ++- crates/matrix-sdk/src/encryption/tasks.rs | 83 +++++++++++++++++-- 7 files changed, 94 insertions(+), 15 deletions(-) diff --git a/crates/matrix-sdk/CHANGELOG.md b/crates/matrix-sdk/CHANGELOG.md index 587e3298e9a..0ed93116bdd 100644 --- a/crates/matrix-sdk/CHANGELOG.md +++ b/crates/matrix-sdk/CHANGELOG.md @@ -14,6 +14,9 @@ All notable changes to this project will be documented in this file. ### Features +- Add support to accept historic room key bundles that arrive out of order, i.e. + the bundle arrives after the invite has already been accepted. + ([#5322](https://github.com/matrix-org/matrix-rust-sdk/pull/5322)) - `Client::add_event_handler`: Set `Option` in `EventHandlerData` for to-device messages. If the to-device message was encrypted, the `EncryptionInfo` will be set. If it is `None` the message was sent in clear. ([#5099](https://github.com/matrix-org/matrix-rust-sdk/pull/5099)) diff --git a/crates/matrix-sdk/src/authentication/matrix/mod.rs b/crates/matrix-sdk/src/authentication/matrix/mod.rs index ded0160e4b0..4ef5115f5d4 100644 --- a/crates/matrix-sdk/src/authentication/matrix/mod.rs +++ b/crates/matrix-sdk/src/authentication/matrix/mod.rs @@ -802,7 +802,7 @@ impl MatrixAuth { _ => None, }; - self.client.encryption().spawn_initialization_task(auth_data); + self.client.encryption().spawn_initialization_task(auth_data).await; } Ok(()) diff --git a/crates/matrix-sdk/src/authentication/oauth/mod.rs b/crates/matrix-sdk/src/authentication/oauth/mod.rs index 6a64dbd4aa9..d925a80464e 100644 --- a/crates/matrix-sdk/src/authentication/oauth/mod.rs +++ b/crates/matrix-sdk/src/authentication/oauth/mod.rs @@ -818,7 +818,7 @@ impl OAuth { } #[cfg(feature = "e2e-encryption")] - self.client.encryption().spawn_initialization_task(None); + self.client.encryption().spawn_initialization_task(None).await; Ok(()) } @@ -1019,7 +1019,7 @@ impl OAuth { self.enable_cross_process_lock().await.map_err(OAuthError::from)?; #[cfg(feature = "e2e-encryption")] - self.client.encryption().spawn_initialization_task(None); + self.client.encryption().spawn_initialization_task(None).await; } Ok(()) diff --git a/crates/matrix-sdk/src/authentication/oauth/qrcode/login.rs b/crates/matrix-sdk/src/authentication/oauth/qrcode/login.rs index 0fd71de008d..95f99de6d0e 100644 --- a/crates/matrix-sdk/src/authentication/oauth/qrcode/login.rs +++ b/crates/matrix-sdk/src/authentication/oauth/qrcode/login.rs @@ -253,7 +253,7 @@ impl<'a> IntoFuture for LoginWithQrCode<'a> { // ourselves see us as verified and the recovery/backup states will // be known. If we did receive all the secrets in the secrets // bundle, then backups will be enabled after this step as well. - self.client.encryption().spawn_initialization_task(None); + self.client.encryption().spawn_initialization_task(None).await; self.client.encryption().wait_for_e2ee_initialization_tasks().await; trace!("successfully logged in and enabled E2EE."); diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index 8441472eddc..1f60124b9a4 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -415,7 +415,7 @@ impl ClientInner { let client = Arc::new(client); #[cfg(feature = "e2e-encryption")] - client.e2ee.initialize_room_key_tasks(&client); + client.e2ee.initialize_tasks(&client); let _ = client .event_cache diff --git a/crates/matrix-sdk/src/encryption/mod.rs b/crates/matrix-sdk/src/encryption/mod.rs index 68632b83d1d..c4c4fc862c4 100644 --- a/crates/matrix-sdk/src/encryption/mod.rs +++ b/crates/matrix-sdk/src/encryption/mod.rs @@ -62,6 +62,7 @@ use ruma::{ #[cfg(feature = "experimental-send-custom-to-device")] use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices}; use serde::Deserialize; +use tasks::BundleReceiverTask; use tokio::sync::{Mutex, RwLockReadGuard}; use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use tracing::{debug, error, instrument, trace, warn}; @@ -134,7 +135,7 @@ impl EncryptionData { } } - pub fn initialize_room_key_tasks(&self, client: &Arc) { + pub fn initialize_tasks(&self, client: &Arc) { let weak_client = WeakClient::from_inner(client); let mut tasks = self.tasks.lock(); @@ -1685,10 +1686,13 @@ impl Encryption { /// there is a proposal (MSC3967) to remove this requirement, which would /// allow for the initial upload of cross-signing keys without /// authentication, rendering this parameter obsolete. - pub(crate) fn spawn_initialization_task(&self, auth_data: Option) { + pub(crate) async fn spawn_initialization_task(&self, auth_data: Option) { + let bundle_receiver_task = BundleReceiverTask::new(&self.client).await; + let mut tasks = self.client.inner.e2ee.tasks.lock(); let this = self.clone(); + tasks.setup_e2ee = Some(spawn(async move { // Update the current state first, so we don't have to wait for the result of // network requests @@ -1707,6 +1711,11 @@ impl Encryption { error!("Couldn't setup and resume recovery {e:?}"); } })); + + // It's fine to be async here as we're only getting the lock protecting the + // `OlmMachine`. Since the lock shouldn't be that contested right after logging + // in we won't delay the login or restoration of the Client. + tasks.receive_historic_room_key_bundles = Some(bundle_receiver_task); } /// Waits for end-to-end encryption initialization tasks to finish, if any diff --git a/crates/matrix-sdk/src/encryption/tasks.rs b/crates/matrix-sdk/src/encryption/tasks.rs index e89266c3a25..0c5811b4b7f 100644 --- a/crates/matrix-sdk/src/encryption/tasks.rs +++ b/crates/matrix-sdk/src/encryption/tasks.rs @@ -14,23 +14,24 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use futures_core::Stream; +use futures_util::{pin_mut, StreamExt}; +use matrix_sdk_base::{crypto::store::types::RoomKeyBundleInfo, RoomState}; use matrix_sdk_common::failures_cache::FailuresCache; use ruma::{ events::room::encrypted::{EncryptedEventScheme, OriginalSyncRoomEncryptedEvent}, serde::Raw, OwnedEventId, OwnedRoomId, }; -use tokio::sync::{ - mpsc::{self, UnboundedReceiver}, - Mutex, -}; -use tracing::{debug, trace, warn}; +use tokio::sync::{mpsc, Mutex}; +use tracing::{debug, info, instrument, trace, warn}; use crate::{ client::WeakClient, encryption::backups::UploadState, executor::{spawn, JoinHandle}, - Client, + room::shared_room_history, + Client, Room, }; /// A cache of room keys we already downloaded. @@ -41,6 +42,7 @@ pub(crate) struct ClientTasks { pub(crate) upload_room_keys: Option, pub(crate) download_room_keys: Option, pub(crate) update_recovery_state_after_backup: Option>, + pub(crate) receive_historic_room_key_bundles: Option, pub(crate) setup_e2ee: Option>, } @@ -72,7 +74,7 @@ impl BackupUploadingTask { let _ = self.sender.send(()); } - pub(crate) async fn listen(client: WeakClient, mut receiver: UnboundedReceiver<()>) { + pub(crate) async fn listen(client: WeakClient, mut receiver: mpsc::UnboundedReceiver<()>) { while receiver.recv().await.is_some() { if let Some(client) = client.get() { let upload_progress = &client.inner.e2ee.backup_state.upload_progress; @@ -176,7 +178,10 @@ impl BackupDownloadTask { /// # Arguments /// /// * `receiver` - The source of incoming [`RoomKeyDownloadRequest`]s. - async fn listen(client: WeakClient, mut receiver: UnboundedReceiver) { + async fn listen( + client: WeakClient, + mut receiver: mpsc::UnboundedReceiver, + ) { let state = Arc::new(Mutex::new(BackupDownloadTaskListenerState::new(client))); while let Some(room_key_download_request) = receiver.recv().await { @@ -385,6 +390,68 @@ impl BackupDownloadTaskListenerState { } } +pub(crate) struct BundleReceiverTask { + _handle: JoinHandle<()>, +} + +impl BundleReceiverTask { + pub async fn new(client: &Client) -> Self { + let stream = client.encryption().historic_room_key_stream().await.expect("E2EE tasks should only be initialized once we have logged in and have access to an OlmMachine"); + let weak_client = WeakClient::from_client(client); + let handle = spawn(Self::listen_task(weak_client, stream)); + + Self { _handle: handle } + } + + async fn listen_task(client: WeakClient, stream: impl Stream) { + pin_mut!(stream); + + // TODO: Listening to this stream is not enough for iOS due to the NSE killing + // our OlmMachine and thus also this stream. We need to add an event handler + // that will listen for the bundle event. To be able to add an event handler, + // we'll have to implement the bundle event in Ruma. + while let Some(bundle_info) = stream.next().await { + let Some(client) = client.get() else { + // The client was dropped while we were waiting on the stream. Let's end the + // loop, since this means that the application has shut down. + break; + }; + + let Some(room) = client.get_room(&bundle_info.room_id) else { + warn!(room_id = %bundle_info.room_id, "Received a historic room key bundle for an unknown room"); + continue; + }; + + Self::handle_bundle(&room, &bundle_info).await; + } + } + + #[instrument(skip(room), fields(room_id = %room.room_id()))] + async fn handle_bundle(room: &Room, bundle_info: &RoomKeyBundleInfo) { + if Self::should_accept_bundle(room, bundle_info) { + info!("Accepting an out of order key bundle."); + + if let Err(e) = + shared_room_history::maybe_accept_key_bundle(room, &bundle_info.sender).await + { + warn!("Couldn't accept a late room key bundle {e:?}"); + } + } else { + info!("Refusing to accept a historic room key bundle."); + } + } + + fn should_accept_bundle(room: &Room, _bundle_info: &RoomKeyBundleInfo) -> bool { + // TODO: Check that the person that invited us to this room is the same as the + // sender, of the bundle. Otherwise don't ignore the bundle. + // TODO: Check that we joined the room "recently". (How do you do this if you + // accept the invite on another client? I guess we remember when the transition + // from Invited to Joined happened, but can't the server force us into a joined + // state if we do this? + room.state() == RoomState::Joined + } +} + #[cfg(all(test, not(target_family = "wasm")))] mod test { use matrix_sdk_test::async_test; From c63dc8fa86d03fcf41668f6b444833d411c8f228 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Fri, 13 Jun 2025 12:53:49 +0200 Subject: [PATCH 2/4] test(sdk): Test the conditions under which we accept a historic room key bundle --- crates/matrix-sdk/src/encryption/tasks.rs | 46 +++++++++++++++++++++-- 1 file changed, 43 insertions(+), 3 deletions(-) diff --git a/crates/matrix-sdk/src/encryption/tasks.rs b/crates/matrix-sdk/src/encryption/tasks.rs index 0c5811b4b7f..b69ffe701c1 100644 --- a/crates/matrix-sdk/src/encryption/tasks.rs +++ b/crates/matrix-sdk/src/encryption/tasks.rs @@ -454,13 +454,13 @@ impl BundleReceiverTask { #[cfg(all(test, not(target_family = "wasm")))] mod test { - use matrix_sdk_test::async_test; - use ruma::{event_id, room_id}; + use matrix_sdk_test::{async_test, InvitedRoomBuilder, JoinedRoomBuilder}; + use ruma::{event_id, room_id, user_id}; use serde_json::json; use wiremock::MockServer; use super::*; - use crate::test_utils::logged_in_client; + use crate::test_utils::{logged_in_client, mocks::MatrixMockServer}; // Test that, if backups are not enabled, we don't incorrectly mark a room key // as downloaded. @@ -518,4 +518,44 @@ mod test { ) } } + + /// Test that ensures that we only accept a bundle if a certain set of + /// conditions is met. + #[async_test] + async fn test_should_accept_bundle() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().logged_in_with_oauth().build().await; + + let user_id = user_id!("@alice:localhost"); + let joined_room_id = room_id!("!joined:localhost"); + let invited_rom_id = room_id!("!invited:localhost"); + + server + .mock_sync() + .ok_and_run(&client, |builder| { + builder + .add_joined_room(JoinedRoomBuilder::new(joined_room_id)) + .add_invited_room(InvitedRoomBuilder::new(invited_rom_id)); + }) + .await; + + let room = + client.get_room(joined_room_id).expect("We should have access to our joined room now"); + + let bundle_info = + RoomKeyBundleInfo { sender: user_id.to_owned(), room_id: joined_room_id.to_owned() }; + + assert!(BundleReceiverTask::should_accept_bundle(&room, &bundle_info)); + + let invited_room = + client.get_room(invited_rom_id).expect("We should have access to our invited room now"); + + assert!( + !BundleReceiverTask::should_accept_bundle(&invited_room, &bundle_info), + "We should not accept a bundle if we didn't join the room." + ); + + // TODO: Add more cases here once we figure out the correct acceptance + // rules. + } } From bc5afce0ce370791f8fac6d9f5ee9e0a7562ee03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Fri, 13 Jun 2025 12:53:49 +0200 Subject: [PATCH 3/4] test(sdk): Test that we accept historic room key bundles if they arrive out of order --- .../src/tests/e2ee/shared_history.rs | 107 +++++++++++++++++- 1 file changed, 106 insertions(+), 1 deletion(-) diff --git a/testing/matrix-sdk-integration-testing/src/tests/e2ee/shared_history.rs b/testing/matrix-sdk-integration-testing/src/tests/e2ee/shared_history.rs index 02087acaaf4..b87625497d6 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/e2ee/shared_history.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/e2ee/shared_history.rs @@ -1,11 +1,12 @@ use std::ops::Deref; use anyhow::Result; -use assert_matches2::assert_let; +use assert_matches2::{assert_let, assert_matches}; use assign::assign; use futures::{pin_mut, FutureExt, StreamExt}; use matrix_sdk::{ assert_decrypted_message_eq, + deserialized_responses::TimelineEventKind, encryption::EncryptionSettings, ruma::{ api::client::room::create_room::v3::{Request as CreateRoomRequest, RoomPreset}, @@ -128,3 +129,107 @@ async fn test_history_share_on_invite() -> Result<()> { Ok(()) } + +/// When a shared history bundle arrives after we joined we accept the bundle. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_history_share_on_invite_out_of_order() -> Result<()> { + let alice_span = tracing::info_span!("alice"); + let bob_span = tracing::info_span!("bob"); + + let encryption_settings = + EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() }; + + let alice = TestClientBuilder::new("alice") + .encryption_settings(encryption_settings) + .enable_share_history_on_invite(true) + .build() + .await?; + + let sync_service_span = tracing::info_span!(parent: &alice_span, "sync_service"); + let alice_sync_service = SyncService::builder(alice.clone()) + .with_parent_span(sync_service_span) + .build() + .await + .expect("Could not build alice sync service"); + + alice.encryption().wait_for_e2ee_initialization_tasks().await; + alice_sync_service.start().await; + + let bob = SyncTokenAwareClient::new( + TestClientBuilder::new("bob") + .encryption_settings(encryption_settings) + .enable_share_history_on_invite(true) + .build() + .await?, + ); + + // Alice creates a room ... + let alice_room = alice + .create_room(assign!(CreateRoomRequest::new(), { + preset: Some(RoomPreset::PublicChat), + })) + .await?; + alice_room.enable_encryption().await?; + + info!(room_id = ?alice_room.room_id(), "Alice has created and enabled encryption in the room"); + + // ... and sends a message + let event_id = alice_room + .send(RoomMessageEventContent::text_plain("Hello Bob")) + .await + .expect("We should be able to send a message to the room") + .event_id; + + let bundle_stream = bob + .encryption() + .historic_room_key_stream() + .await + .expect("We should be able to get the bundle stream"); + pin_mut!(bundle_stream); + + // Alice invites Bob to the room + alice_room.invite_user_by_id(bob.user_id().unwrap()).await?; + + // Alice is done. Bob has been invited and the room key bundle should have been + // sent out. Let's stop syncing so the logs contain less noise. + alice_sync_service.stop().await; + + // Let's first join without syncing, this ensures that the bundle doesn't arrive + // until after we're joined. + let bob_room = bob.join_room_by_id(alice_room.room_id()).instrument(bob_span.clone()).await?; + let event = bob_room.event(&event_id, None).instrument(bob_span.clone()).await?; + + assert_matches!( + event.kind, + TimelineEventKind::UnableToDecrypt { .. }, + "We didn't yet sync the bundle, so this should be a UTD." + ); + assert!(bundle_stream.next().now_or_never().flatten().is_none()); + + // Ok, the event is a UTD and there's no info about the bundle on the stream. + // Let's now sync to receive the bundle. + bob.sync_once().instrument(bob_span.clone()).await?; + + let info = bundle_stream + .next() + .now_or_never() + .flatten() + .expect("We should be notified about the received bundle"); + + assert_eq!(Some(info.sender.deref()), alice.user_id()); + assert_eq!(info.room_id, alice_room.room_id()); + + let event = bob_room + .event(&event_id, None) + .instrument(bob_span.clone()) + .await + .expect("Bob should be able to fetch the historic event"); + + assert_decrypted_message_eq!( + event, + "Hello Bob", + "Now that we synced, we should have accepted the bundle and the event should be decrypted" + ); + + Ok(()) +} From 636865411f37a356324d59ba1e1e9a1d4b58c75b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 2 Jul 2025 14:08:56 +0200 Subject: [PATCH 4/4] fixup! feat(sdk): Add a tasks that listens for historic room keys if they arrive out of order --- crates/matrix-sdk/src/encryption/mod.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/crates/matrix-sdk/src/encryption/mod.rs b/crates/matrix-sdk/src/encryption/mod.rs index c4c4fc862c4..3673115429f 100644 --- a/crates/matrix-sdk/src/encryption/mod.rs +++ b/crates/matrix-sdk/src/encryption/mod.rs @@ -1687,7 +1687,14 @@ impl Encryption { /// allow for the initial upload of cross-signing keys without /// authentication, rendering this parameter obsolete. pub(crate) async fn spawn_initialization_task(&self, auth_data: Option) { - let bundle_receiver_task = BundleReceiverTask::new(&self.client).await; + // It's fine to be async here as we're only getting the lock protecting the + // `OlmMachine`. Since the lock shouldn't be that contested right after logging + // in we won't delay the login or restoration of the Client. + let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite { + Some(BundleReceiverTask::new(&self.client).await) + } else { + None + }; let mut tasks = self.client.inner.e2ee.tasks.lock(); @@ -1712,10 +1719,7 @@ impl Encryption { } })); - // It's fine to be async here as we're only getting the lock protecting the - // `OlmMachine`. Since the lock shouldn't be that contested right after logging - // in we won't delay the login or restoration of the Client. - tasks.receive_historic_room_key_bundles = Some(bundle_receiver_task); + tasks.receive_historic_room_key_bundles = bundle_receiver_task; } /// Waits for end-to-end encryption initialization tasks to finish, if any