Skip to content

feat(sdk): Add a tasks that listens for historic room keys if they arrive out of order #5322

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/matrix-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ All notable changes to this project will be documented in this file.

### Features

- Add support to accept historic room key bundles that arrive out of order, i.e.
the bundle arrives after the invite has already been accepted.
([#5322](https://github.com/matrix-org/matrix-rust-sdk/pull/5322))
- `Client::add_event_handler`: Set `Option<EncryptionInfo>` in `EventHandlerData` for to-device messages.
If the to-device message was encrypted, the `EncryptionInfo` will be set. If it is `None` the message was sent in clear.
([#5099](https://github.com/matrix-org/matrix-rust-sdk/pull/5099))
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk/src/authentication/matrix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ impl MatrixAuth {
_ => None,
};

self.client.encryption().spawn_initialization_task(auth_data);
self.client.encryption().spawn_initialization_task(auth_data).await;
}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions crates/matrix-sdk/src/authentication/oauth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ impl OAuth {
}

#[cfg(feature = "e2e-encryption")]
self.client.encryption().spawn_initialization_task(None);
self.client.encryption().spawn_initialization_task(None).await;

Ok(())
}
Expand Down Expand Up @@ -1019,7 +1019,7 @@ impl OAuth {
self.enable_cross_process_lock().await.map_err(OAuthError::from)?;

#[cfg(feature = "e2e-encryption")]
self.client.encryption().spawn_initialization_task(None);
self.client.encryption().spawn_initialization_task(None).await;
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl<'a> IntoFuture for LoginWithQrCode<'a> {
// ourselves see us as verified and the recovery/backup states will
// be known. If we did receive all the secrets in the secrets
// bundle, then backups will be enabled after this step as well.
self.client.encryption().spawn_initialization_task(None);
self.client.encryption().spawn_initialization_task(None).await;
self.client.encryption().wait_for_e2ee_initialization_tasks().await;

trace!("successfully logged in and enabled E2EE.");
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ impl ClientInner {
let client = Arc::new(client);

#[cfg(feature = "e2e-encryption")]
client.e2ee.initialize_room_key_tasks(&client);
client.e2ee.initialize_tasks(&client);

let _ = client
.event_cache
Expand Down
17 changes: 15 additions & 2 deletions crates/matrix-sdk/src/encryption/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use ruma::{
#[cfg(feature = "experimental-send-custom-to-device")]
use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
use serde::Deserialize;
use tasks::BundleReceiverTask;
use tokio::sync::{Mutex, RwLockReadGuard};
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tracing::{debug, error, instrument, trace, warn};
Expand Down Expand Up @@ -134,7 +135,7 @@ impl EncryptionData {
}
}

pub fn initialize_room_key_tasks(&self, client: &Arc<ClientInner>) {
pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
let weak_client = WeakClient::from_inner(client);

let mut tasks = self.tasks.lock();
Expand Down Expand Up @@ -1685,10 +1686,20 @@ impl Encryption {
/// there is a proposal (MSC3967) to remove this requirement, which would
/// allow for the initial upload of cross-signing keys without
/// authentication, rendering this parameter obsolete.
pub(crate) fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
// It's fine to be async here as we're only getting the lock protecting the
// `OlmMachine`. Since the lock shouldn't be that contested right after logging
// in we won't delay the login or restoration of the Client.
let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
Some(BundleReceiverTask::new(&self.client).await)
} else {
None
};

let mut tasks = self.client.inner.e2ee.tasks.lock();

let this = self.clone();

tasks.setup_e2ee = Some(spawn(async move {
// Update the current state first, so we don't have to wait for the result of
// network requests
Expand All @@ -1707,6 +1718,8 @@ impl Encryption {
error!("Couldn't setup and resume recovery {e:?}");
}
}));

tasks.receive_historic_room_key_bundles = bundle_receiver_task;
}

/// Waits for end-to-end encryption initialization tasks to finish, if any
Expand Down
129 changes: 118 additions & 11 deletions crates/matrix-sdk/src/encryption/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,24 @@

use std::{collections::BTreeMap, sync::Arc, time::Duration};

use futures_core::Stream;
use futures_util::{pin_mut, StreamExt};
use matrix_sdk_base::{crypto::store::types::RoomKeyBundleInfo, RoomState};
use matrix_sdk_common::failures_cache::FailuresCache;
use ruma::{
events::room::encrypted::{EncryptedEventScheme, OriginalSyncRoomEncryptedEvent},
serde::Raw,
OwnedEventId, OwnedRoomId,
};
use tokio::sync::{
mpsc::{self, UnboundedReceiver},
Mutex,
};
use tracing::{debug, trace, warn};
use tokio::sync::{mpsc, Mutex};
use tracing::{debug, info, instrument, trace, warn};

use crate::{
client::WeakClient,
encryption::backups::UploadState,
executor::{spawn, JoinHandle},
Client,
room::shared_room_history,
Client, Room,
};

/// A cache of room keys we already downloaded.
Expand All @@ -41,6 +42,7 @@ pub(crate) struct ClientTasks {
pub(crate) upload_room_keys: Option<BackupUploadingTask>,
pub(crate) download_room_keys: Option<BackupDownloadTask>,
pub(crate) update_recovery_state_after_backup: Option<JoinHandle<()>>,
pub(crate) receive_historic_room_key_bundles: Option<BundleReceiverTask>,
pub(crate) setup_e2ee: Option<JoinHandle<()>>,
}

Expand Down Expand Up @@ -72,7 +74,7 @@ impl BackupUploadingTask {
let _ = self.sender.send(());
}

pub(crate) async fn listen(client: WeakClient, mut receiver: UnboundedReceiver<()>) {
pub(crate) async fn listen(client: WeakClient, mut receiver: mpsc::UnboundedReceiver<()>) {
while receiver.recv().await.is_some() {
if let Some(client) = client.get() {
let upload_progress = &client.inner.e2ee.backup_state.upload_progress;
Expand Down Expand Up @@ -176,7 +178,10 @@ impl BackupDownloadTask {
/// # Arguments
///
/// * `receiver` - The source of incoming [`RoomKeyDownloadRequest`]s.
async fn listen(client: WeakClient, mut receiver: UnboundedReceiver<RoomKeyDownloadRequest>) {
async fn listen(
client: WeakClient,
mut receiver: mpsc::UnboundedReceiver<RoomKeyDownloadRequest>,
) {
let state = Arc::new(Mutex::new(BackupDownloadTaskListenerState::new(client)));

while let Some(room_key_download_request) = receiver.recv().await {
Expand Down Expand Up @@ -385,15 +390,77 @@ impl BackupDownloadTaskListenerState {
}
}

pub(crate) struct BundleReceiverTask {
_handle: JoinHandle<()>,
}

impl BundleReceiverTask {
pub async fn new(client: &Client) -> Self {
let stream = client.encryption().historic_room_key_stream().await.expect("E2EE tasks should only be initialized once we have logged in and have access to an OlmMachine");
let weak_client = WeakClient::from_client(client);
let handle = spawn(Self::listen_task(weak_client, stream));

Self { _handle: handle }
}

async fn listen_task(client: WeakClient, stream: impl Stream<Item = RoomKeyBundleInfo>) {
pin_mut!(stream);

// TODO: Listening to this stream is not enough for iOS due to the NSE killing
// our OlmMachine and thus also this stream. We need to add an event handler
// that will listen for the bundle event. To be able to add an event handler,
// we'll have to implement the bundle event in Ruma.
while let Some(bundle_info) = stream.next().await {
let Some(client) = client.get() else {
// The client was dropped while we were waiting on the stream. Let's end the
// loop, since this means that the application has shut down.
break;
};

let Some(room) = client.get_room(&bundle_info.room_id) else {
warn!(room_id = %bundle_info.room_id, "Received a historic room key bundle for an unknown room");
continue;
};

Self::handle_bundle(&room, &bundle_info).await;
}
}

#[instrument(skip(room), fields(room_id = %room.room_id()))]
async fn handle_bundle(room: &Room, bundle_info: &RoomKeyBundleInfo) {
if Self::should_accept_bundle(room, bundle_info) {
info!("Accepting an out of order key bundle.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really "out of order", right? More "late-arriving".


if let Err(e) =
shared_room_history::maybe_accept_key_bundle(room, &bundle_info.sender).await
{
warn!("Couldn't accept a late room key bundle {e:?}");
}
} else {
info!("Refusing to accept a historic room key bundle.");
}
}

fn should_accept_bundle(room: &Room, _bundle_info: &RoomKeyBundleInfo) -> bool {
// TODO: Check that the person that invited us to this room is the same as the
// sender, of the bundle. Otherwise don't ignore the bundle.
// TODO: Check that we joined the room "recently". (How do you do this if you
// accept the invite on another client? I guess we remember when the transition
// from Invited to Joined happened, but can't the server force us into a joined
// state if we do this?
room.state() == RoomState::Joined
}
}

#[cfg(all(test, not(target_family = "wasm")))]
mod test {
use matrix_sdk_test::async_test;
use ruma::{event_id, room_id};
use matrix_sdk_test::{async_test, InvitedRoomBuilder, JoinedRoomBuilder};
use ruma::{event_id, room_id, user_id};
use serde_json::json;
use wiremock::MockServer;

use super::*;
use crate::test_utils::logged_in_client;
use crate::test_utils::{logged_in_client, mocks::MatrixMockServer};

// Test that, if backups are not enabled, we don't incorrectly mark a room key
// as downloaded.
Expand Down Expand Up @@ -451,4 +518,44 @@ mod test {
)
}
}

/// Test that ensures that we only accept a bundle if a certain set of
/// conditions is met.
#[async_test]
async fn test_should_accept_bundle() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().logged_in_with_oauth().build().await;

let user_id = user_id!("@alice:localhost");
let joined_room_id = room_id!("!joined:localhost");
let invited_rom_id = room_id!("!invited:localhost");

server
.mock_sync()
.ok_and_run(&client, |builder| {
builder
.add_joined_room(JoinedRoomBuilder::new(joined_room_id))
.add_invited_room(InvitedRoomBuilder::new(invited_rom_id));
})
.await;

let room =
client.get_room(joined_room_id).expect("We should have access to our joined room now");

let bundle_info =
RoomKeyBundleInfo { sender: user_id.to_owned(), room_id: joined_room_id.to_owned() };

assert!(BundleReceiverTask::should_accept_bundle(&room, &bundle_info));

let invited_room =
client.get_room(invited_rom_id).expect("We should have access to our invited room now");

assert!(
!BundleReceiverTask::should_accept_bundle(&invited_room, &bundle_info),
"We should not accept a bundle if we didn't join the room."
);

// TODO: Add more cases here once we figure out the correct acceptance
// rules.
}
}
Loading
Loading