14
14
15
15
use std:: { collections:: BTreeMap , sync:: Arc , time:: Duration } ;
16
16
17
+ use futures_core:: Stream ;
18
+ use futures_util:: { pin_mut, StreamExt } ;
19
+ use matrix_sdk_base:: { crypto:: store:: types:: RoomKeyBundleInfo , RoomState } ;
17
20
use matrix_sdk_common:: failures_cache:: FailuresCache ;
18
21
use ruma:: {
19
22
events:: room:: encrypted:: { EncryptedEventScheme , OriginalSyncRoomEncryptedEvent } ,
20
23
serde:: Raw ,
21
24
OwnedEventId , OwnedRoomId ,
22
25
} ;
23
- use tokio:: sync:: {
24
- mpsc:: { self , UnboundedReceiver } ,
25
- Mutex ,
26
- } ;
27
- use tracing:: { debug, trace, warn} ;
26
+ use tokio:: sync:: { mpsc, Mutex } ;
27
+ use tracing:: { debug, info, instrument, trace, warn} ;
28
28
29
29
use crate :: {
30
30
client:: WeakClient ,
31
31
encryption:: backups:: UploadState ,
32
32
executor:: { spawn, JoinHandle } ,
33
- Client ,
33
+ room:: shared_room_history,
34
+ Client , Room ,
34
35
} ;
35
36
36
37
/// A cache of room keys we already downloaded.
@@ -41,6 +42,7 @@ pub(crate) struct ClientTasks {
41
42
pub ( crate ) upload_room_keys : Option < BackupUploadingTask > ,
42
43
pub ( crate ) download_room_keys : Option < BackupDownloadTask > ,
43
44
pub ( crate ) update_recovery_state_after_backup : Option < JoinHandle < ( ) > > ,
45
+ pub ( crate ) receive_historic_room_key_bundles : Option < BundleReceiverTask > ,
44
46
pub ( crate ) setup_e2ee : Option < JoinHandle < ( ) > > ,
45
47
}
46
48
@@ -72,7 +74,7 @@ impl BackupUploadingTask {
72
74
let _ = self . sender . send ( ( ) ) ;
73
75
}
74
76
75
- pub ( crate ) async fn listen ( client : WeakClient , mut receiver : UnboundedReceiver < ( ) > ) {
77
+ pub ( crate ) async fn listen ( client : WeakClient , mut receiver : mpsc :: UnboundedReceiver < ( ) > ) {
76
78
while receiver. recv ( ) . await . is_some ( ) {
77
79
if let Some ( client) = client. get ( ) {
78
80
let upload_progress = & client. inner . e2ee . backup_state . upload_progress ;
@@ -176,7 +178,10 @@ impl BackupDownloadTask {
176
178
/// # Arguments
177
179
///
178
180
/// * `receiver` - The source of incoming [`RoomKeyDownloadRequest`]s.
179
- async fn listen ( client : WeakClient , mut receiver : UnboundedReceiver < RoomKeyDownloadRequest > ) {
181
+ async fn listen (
182
+ client : WeakClient ,
183
+ mut receiver : mpsc:: UnboundedReceiver < RoomKeyDownloadRequest > ,
184
+ ) {
180
185
let state = Arc :: new ( Mutex :: new ( BackupDownloadTaskListenerState :: new ( client) ) ) ;
181
186
182
187
while let Some ( room_key_download_request) = receiver. recv ( ) . await {
@@ -385,6 +390,72 @@ impl BackupDownloadTaskListenerState {
385
390
}
386
391
}
387
392
393
+ pub ( crate ) struct BundleReceiverTask {
394
+ _handle : JoinHandle < ( ) > ,
395
+ }
396
+
397
+ impl BundleReceiverTask {
398
+ pub async fn new ( client : & Client ) -> Self {
399
+ let stream = client. encryption ( ) . historic_room_key_stream ( ) . await . expect ( "E2EE tasks should only be initialized once we have logged in and have access to an OlmMachine" ) ;
400
+ let weak_client = WeakClient :: from_client ( client) ;
401
+ let handle = spawn ( Self :: listen_task ( weak_client, stream) ) ;
402
+
403
+ Self { _handle : handle }
404
+ }
405
+
406
+ async fn listen_task ( client : WeakClient , stream : impl Stream < Item = RoomKeyBundleInfo > ) {
407
+ pin_mut ! ( stream) ;
408
+
409
+ // TODO: Listening to this stream is not enough for iOS due to the NSE killing
410
+ // our OlmMachine and thus also this stream. We need to add an event handler
411
+ // that will listen for the bundle event. To be able to add an event handler,
412
+ // we'll have to implement the bundle event in Ruma.
413
+ while let Some ( bundle_info) = stream. next ( ) . await {
414
+ let Some ( client) = client. get ( ) else {
415
+ // The client was dropped while we were waiting on the stream. Let's end the
416
+ // loop, since this means that the application has shut down.
417
+ break ;
418
+ } ;
419
+
420
+ let Some ( room) = client. get_room ( & bundle_info. room_id ) else {
421
+ warn ! ( room_id = %bundle_info. room_id, "Received a historic room key bundle for an unknown room" ) ;
422
+ continue ;
423
+ } ;
424
+
425
+ Self :: handle_bundle ( & room, & bundle_info) . await ;
426
+ }
427
+ }
428
+
429
+ #[ instrument( skip( room) , fields( room_id = %room. room_id( ) ) ) ]
430
+ async fn handle_bundle ( room : & Room , bundle_info : & RoomKeyBundleInfo ) {
431
+ if Self :: should_accept_bundle ( & room, & bundle_info) {
432
+ info ! ( "Accepting an out of order key bundle." ) ;
433
+
434
+ if let Err ( e) =
435
+ shared_room_history:: maybe_accept_key_bundle ( & room, & bundle_info. sender ) . await
436
+ {
437
+ warn ! ( "Couldn't accept a late room key bundle {e:?}" ) ;
438
+ }
439
+ } else {
440
+ info ! ( "Refusing to accept a historic room key bundle." ) ;
441
+ }
442
+ }
443
+
444
+ fn should_accept_bundle ( room : & Room , _bundle_info : & RoomKeyBundleInfo ) -> bool {
445
+ // TODO: Check that the person that invited us to this room is the same as the
446
+ // sender, of the bundle. Otherwise don't ignore the bundle.
447
+ // TODO: Check that we joined the room "recently". (How do you do this if you
448
+ // accept the invite on another client? I guess we remember when the transition
449
+ // from Invited to Joined happened, but can't the server force us into a joined
450
+ // state if we do this?
451
+ if room. state ( ) == RoomState :: Joined {
452
+ true
453
+ } else {
454
+ false
455
+ }
456
+ }
457
+ }
458
+
388
459
#[ cfg( all( test, not( target_family = "wasm" ) ) ) ]
389
460
mod test {
390
461
use matrix_sdk_test:: async_test;
0 commit comments