diff --git a/crates/matrix-sdk/CHANGELOG.md b/crates/matrix-sdk/CHANGELOG.md index 587e3298e9a..0ed93116bdd 100644 --- a/crates/matrix-sdk/CHANGELOG.md +++ b/crates/matrix-sdk/CHANGELOG.md @@ -14,6 +14,9 @@ All notable changes to this project will be documented in this file. ### Features +- Add support to accept historic room key bundles that arrive out of order, i.e. + the bundle arrives after the invite has already been accepted. + ([#5322](https://github.com/matrix-org/matrix-rust-sdk/pull/5322)) - `Client::add_event_handler`: Set `Option` in `EventHandlerData` for to-device messages. If the to-device message was encrypted, the `EncryptionInfo` will be set. If it is `None` the message was sent in clear. ([#5099](https://github.com/matrix-org/matrix-rust-sdk/pull/5099)) diff --git a/crates/matrix-sdk/src/authentication/matrix/mod.rs b/crates/matrix-sdk/src/authentication/matrix/mod.rs index ded0160e4b0..4ef5115f5d4 100644 --- a/crates/matrix-sdk/src/authentication/matrix/mod.rs +++ b/crates/matrix-sdk/src/authentication/matrix/mod.rs @@ -802,7 +802,7 @@ impl MatrixAuth { _ => None, }; - self.client.encryption().spawn_initialization_task(auth_data); + self.client.encryption().spawn_initialization_task(auth_data).await; } Ok(()) diff --git a/crates/matrix-sdk/src/authentication/oauth/mod.rs b/crates/matrix-sdk/src/authentication/oauth/mod.rs index 6a64dbd4aa9..d925a80464e 100644 --- a/crates/matrix-sdk/src/authentication/oauth/mod.rs +++ b/crates/matrix-sdk/src/authentication/oauth/mod.rs @@ -818,7 +818,7 @@ impl OAuth { } #[cfg(feature = "e2e-encryption")] - self.client.encryption().spawn_initialization_task(None); + self.client.encryption().spawn_initialization_task(None).await; Ok(()) } @@ -1019,7 +1019,7 @@ impl OAuth { self.enable_cross_process_lock().await.map_err(OAuthError::from)?; #[cfg(feature = "e2e-encryption")] - self.client.encryption().spawn_initialization_task(None); + self.client.encryption().spawn_initialization_task(None).await; } Ok(()) diff --git a/crates/matrix-sdk/src/authentication/oauth/qrcode/login.rs b/crates/matrix-sdk/src/authentication/oauth/qrcode/login.rs index 0fd71de008d..95f99de6d0e 100644 --- a/crates/matrix-sdk/src/authentication/oauth/qrcode/login.rs +++ b/crates/matrix-sdk/src/authentication/oauth/qrcode/login.rs @@ -253,7 +253,7 @@ impl<'a> IntoFuture for LoginWithQrCode<'a> { // ourselves see us as verified and the recovery/backup states will // be known. If we did receive all the secrets in the secrets // bundle, then backups will be enabled after this step as well. - self.client.encryption().spawn_initialization_task(None); + self.client.encryption().spawn_initialization_task(None).await; self.client.encryption().wait_for_e2ee_initialization_tasks().await; trace!("successfully logged in and enabled E2EE."); diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index 8441472eddc..1f60124b9a4 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -415,7 +415,7 @@ impl ClientInner { let client = Arc::new(client); #[cfg(feature = "e2e-encryption")] - client.e2ee.initialize_room_key_tasks(&client); + client.e2ee.initialize_tasks(&client); let _ = client .event_cache diff --git a/crates/matrix-sdk/src/encryption/mod.rs b/crates/matrix-sdk/src/encryption/mod.rs index 68632b83d1d..3673115429f 100644 --- a/crates/matrix-sdk/src/encryption/mod.rs +++ b/crates/matrix-sdk/src/encryption/mod.rs @@ -62,6 +62,7 @@ use ruma::{ #[cfg(feature = "experimental-send-custom-to-device")] use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices}; use serde::Deserialize; +use tasks::BundleReceiverTask; use tokio::sync::{Mutex, RwLockReadGuard}; use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use tracing::{debug, error, instrument, trace, warn}; @@ -134,7 +135,7 @@ impl EncryptionData { } } - pub fn initialize_room_key_tasks(&self, client: &Arc) { + pub fn initialize_tasks(&self, client: &Arc) { let weak_client = WeakClient::from_inner(client); let mut tasks = self.tasks.lock(); @@ -1685,10 +1686,20 @@ impl Encryption { /// there is a proposal (MSC3967) to remove this requirement, which would /// allow for the initial upload of cross-signing keys without /// authentication, rendering this parameter obsolete. - pub(crate) fn spawn_initialization_task(&self, auth_data: Option) { + pub(crate) async fn spawn_initialization_task(&self, auth_data: Option) { + // It's fine to be async here as we're only getting the lock protecting the + // `OlmMachine`. Since the lock shouldn't be that contested right after logging + // in we won't delay the login or restoration of the Client. + let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite { + Some(BundleReceiverTask::new(&self.client).await) + } else { + None + }; + let mut tasks = self.client.inner.e2ee.tasks.lock(); let this = self.clone(); + tasks.setup_e2ee = Some(spawn(async move { // Update the current state first, so we don't have to wait for the result of // network requests @@ -1707,6 +1718,8 @@ impl Encryption { error!("Couldn't setup and resume recovery {e:?}"); } })); + + tasks.receive_historic_room_key_bundles = bundle_receiver_task; } /// Waits for end-to-end encryption initialization tasks to finish, if any diff --git a/crates/matrix-sdk/src/encryption/tasks.rs b/crates/matrix-sdk/src/encryption/tasks.rs index e89266c3a25..b69ffe701c1 100644 --- a/crates/matrix-sdk/src/encryption/tasks.rs +++ b/crates/matrix-sdk/src/encryption/tasks.rs @@ -14,23 +14,24 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use futures_core::Stream; +use futures_util::{pin_mut, StreamExt}; +use matrix_sdk_base::{crypto::store::types::RoomKeyBundleInfo, RoomState}; use matrix_sdk_common::failures_cache::FailuresCache; use ruma::{ events::room::encrypted::{EncryptedEventScheme, OriginalSyncRoomEncryptedEvent}, serde::Raw, OwnedEventId, OwnedRoomId, }; -use tokio::sync::{ - mpsc::{self, UnboundedReceiver}, - Mutex, -}; -use tracing::{debug, trace, warn}; +use tokio::sync::{mpsc, Mutex}; +use tracing::{debug, info, instrument, trace, warn}; use crate::{ client::WeakClient, encryption::backups::UploadState, executor::{spawn, JoinHandle}, - Client, + room::shared_room_history, + Client, Room, }; /// A cache of room keys we already downloaded. @@ -41,6 +42,7 @@ pub(crate) struct ClientTasks { pub(crate) upload_room_keys: Option, pub(crate) download_room_keys: Option, pub(crate) update_recovery_state_after_backup: Option>, + pub(crate) receive_historic_room_key_bundles: Option, pub(crate) setup_e2ee: Option>, } @@ -72,7 +74,7 @@ impl BackupUploadingTask { let _ = self.sender.send(()); } - pub(crate) async fn listen(client: WeakClient, mut receiver: UnboundedReceiver<()>) { + pub(crate) async fn listen(client: WeakClient, mut receiver: mpsc::UnboundedReceiver<()>) { while receiver.recv().await.is_some() { if let Some(client) = client.get() { let upload_progress = &client.inner.e2ee.backup_state.upload_progress; @@ -176,7 +178,10 @@ impl BackupDownloadTask { /// # Arguments /// /// * `receiver` - The source of incoming [`RoomKeyDownloadRequest`]s. - async fn listen(client: WeakClient, mut receiver: UnboundedReceiver) { + async fn listen( + client: WeakClient, + mut receiver: mpsc::UnboundedReceiver, + ) { let state = Arc::new(Mutex::new(BackupDownloadTaskListenerState::new(client))); while let Some(room_key_download_request) = receiver.recv().await { @@ -385,15 +390,77 @@ impl BackupDownloadTaskListenerState { } } +pub(crate) struct BundleReceiverTask { + _handle: JoinHandle<()>, +} + +impl BundleReceiverTask { + pub async fn new(client: &Client) -> Self { + let stream = client.encryption().historic_room_key_stream().await.expect("E2EE tasks should only be initialized once we have logged in and have access to an OlmMachine"); + let weak_client = WeakClient::from_client(client); + let handle = spawn(Self::listen_task(weak_client, stream)); + + Self { _handle: handle } + } + + async fn listen_task(client: WeakClient, stream: impl Stream) { + pin_mut!(stream); + + // TODO: Listening to this stream is not enough for iOS due to the NSE killing + // our OlmMachine and thus also this stream. We need to add an event handler + // that will listen for the bundle event. To be able to add an event handler, + // we'll have to implement the bundle event in Ruma. + while let Some(bundle_info) = stream.next().await { + let Some(client) = client.get() else { + // The client was dropped while we were waiting on the stream. Let's end the + // loop, since this means that the application has shut down. + break; + }; + + let Some(room) = client.get_room(&bundle_info.room_id) else { + warn!(room_id = %bundle_info.room_id, "Received a historic room key bundle for an unknown room"); + continue; + }; + + Self::handle_bundle(&room, &bundle_info).await; + } + } + + #[instrument(skip(room), fields(room_id = %room.room_id()))] + async fn handle_bundle(room: &Room, bundle_info: &RoomKeyBundleInfo) { + if Self::should_accept_bundle(room, bundle_info) { + info!("Accepting an out of order key bundle."); + + if let Err(e) = + shared_room_history::maybe_accept_key_bundle(room, &bundle_info.sender).await + { + warn!("Couldn't accept a late room key bundle {e:?}"); + } + } else { + info!("Refusing to accept a historic room key bundle."); + } + } + + fn should_accept_bundle(room: &Room, _bundle_info: &RoomKeyBundleInfo) -> bool { + // TODO: Check that the person that invited us to this room is the same as the + // sender, of the bundle. Otherwise don't ignore the bundle. + // TODO: Check that we joined the room "recently". (How do you do this if you + // accept the invite on another client? I guess we remember when the transition + // from Invited to Joined happened, but can't the server force us into a joined + // state if we do this? + room.state() == RoomState::Joined + } +} + #[cfg(all(test, not(target_family = "wasm")))] mod test { - use matrix_sdk_test::async_test; - use ruma::{event_id, room_id}; + use matrix_sdk_test::{async_test, InvitedRoomBuilder, JoinedRoomBuilder}; + use ruma::{event_id, room_id, user_id}; use serde_json::json; use wiremock::MockServer; use super::*; - use crate::test_utils::logged_in_client; + use crate::test_utils::{logged_in_client, mocks::MatrixMockServer}; // Test that, if backups are not enabled, we don't incorrectly mark a room key // as downloaded. @@ -451,4 +518,44 @@ mod test { ) } } + + /// Test that ensures that we only accept a bundle if a certain set of + /// conditions is met. + #[async_test] + async fn test_should_accept_bundle() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().logged_in_with_oauth().build().await; + + let user_id = user_id!("@alice:localhost"); + let joined_room_id = room_id!("!joined:localhost"); + let invited_rom_id = room_id!("!invited:localhost"); + + server + .mock_sync() + .ok_and_run(&client, |builder| { + builder + .add_joined_room(JoinedRoomBuilder::new(joined_room_id)) + .add_invited_room(InvitedRoomBuilder::new(invited_rom_id)); + }) + .await; + + let room = + client.get_room(joined_room_id).expect("We should have access to our joined room now"); + + let bundle_info = + RoomKeyBundleInfo { sender: user_id.to_owned(), room_id: joined_room_id.to_owned() }; + + assert!(BundleReceiverTask::should_accept_bundle(&room, &bundle_info)); + + let invited_room = + client.get_room(invited_rom_id).expect("We should have access to our invited room now"); + + assert!( + !BundleReceiverTask::should_accept_bundle(&invited_room, &bundle_info), + "We should not accept a bundle if we didn't join the room." + ); + + // TODO: Add more cases here once we figure out the correct acceptance + // rules. + } } diff --git a/testing/matrix-sdk-integration-testing/src/tests/e2ee/shared_history.rs b/testing/matrix-sdk-integration-testing/src/tests/e2ee/shared_history.rs index 02087acaaf4..b87625497d6 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/e2ee/shared_history.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/e2ee/shared_history.rs @@ -1,11 +1,12 @@ use std::ops::Deref; use anyhow::Result; -use assert_matches2::assert_let; +use assert_matches2::{assert_let, assert_matches}; use assign::assign; use futures::{pin_mut, FutureExt, StreamExt}; use matrix_sdk::{ assert_decrypted_message_eq, + deserialized_responses::TimelineEventKind, encryption::EncryptionSettings, ruma::{ api::client::room::create_room::v3::{Request as CreateRoomRequest, RoomPreset}, @@ -128,3 +129,107 @@ async fn test_history_share_on_invite() -> Result<()> { Ok(()) } + +/// When a shared history bundle arrives after we joined we accept the bundle. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_history_share_on_invite_out_of_order() -> Result<()> { + let alice_span = tracing::info_span!("alice"); + let bob_span = tracing::info_span!("bob"); + + let encryption_settings = + EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() }; + + let alice = TestClientBuilder::new("alice") + .encryption_settings(encryption_settings) + .enable_share_history_on_invite(true) + .build() + .await?; + + let sync_service_span = tracing::info_span!(parent: &alice_span, "sync_service"); + let alice_sync_service = SyncService::builder(alice.clone()) + .with_parent_span(sync_service_span) + .build() + .await + .expect("Could not build alice sync service"); + + alice.encryption().wait_for_e2ee_initialization_tasks().await; + alice_sync_service.start().await; + + let bob = SyncTokenAwareClient::new( + TestClientBuilder::new("bob") + .encryption_settings(encryption_settings) + .enable_share_history_on_invite(true) + .build() + .await?, + ); + + // Alice creates a room ... + let alice_room = alice + .create_room(assign!(CreateRoomRequest::new(), { + preset: Some(RoomPreset::PublicChat), + })) + .await?; + alice_room.enable_encryption().await?; + + info!(room_id = ?alice_room.room_id(), "Alice has created and enabled encryption in the room"); + + // ... and sends a message + let event_id = alice_room + .send(RoomMessageEventContent::text_plain("Hello Bob")) + .await + .expect("We should be able to send a message to the room") + .event_id; + + let bundle_stream = bob + .encryption() + .historic_room_key_stream() + .await + .expect("We should be able to get the bundle stream"); + pin_mut!(bundle_stream); + + // Alice invites Bob to the room + alice_room.invite_user_by_id(bob.user_id().unwrap()).await?; + + // Alice is done. Bob has been invited and the room key bundle should have been + // sent out. Let's stop syncing so the logs contain less noise. + alice_sync_service.stop().await; + + // Let's first join without syncing, this ensures that the bundle doesn't arrive + // until after we're joined. + let bob_room = bob.join_room_by_id(alice_room.room_id()).instrument(bob_span.clone()).await?; + let event = bob_room.event(&event_id, None).instrument(bob_span.clone()).await?; + + assert_matches!( + event.kind, + TimelineEventKind::UnableToDecrypt { .. }, + "We didn't yet sync the bundle, so this should be a UTD." + ); + assert!(bundle_stream.next().now_or_never().flatten().is_none()); + + // Ok, the event is a UTD and there's no info about the bundle on the stream. + // Let's now sync to receive the bundle. + bob.sync_once().instrument(bob_span.clone()).await?; + + let info = bundle_stream + .next() + .now_or_never() + .flatten() + .expect("We should be notified about the received bundle"); + + assert_eq!(Some(info.sender.deref()), alice.user_id()); + assert_eq!(info.room_id, alice_room.room_id()); + + let event = bob_room + .event(&event_id, None) + .instrument(bob_span.clone()) + .await + .expect("Bob should be able to fetch the historic event"); + + assert_decrypted_message_eq!( + event, + "Hello Bob", + "Now that we synced, we should have accepted the bundle and the event should be decrypted" + ); + + Ok(()) +}