|
1 | 1 | use std::ops::Deref;
|
2 | 2 |
|
3 | 3 | use anyhow::Result;
|
4 |
| -use assert_matches2::assert_let; |
| 4 | +use assert_matches2::{assert_let, assert_matches}; |
5 | 5 | use assign::assign;
|
6 | 6 | use futures::{pin_mut, FutureExt, StreamExt};
|
7 | 7 | use matrix_sdk::{
|
8 | 8 | assert_decrypted_message_eq,
|
| 9 | + deserialized_responses::TimelineEventKind, |
9 | 10 | encryption::EncryptionSettings,
|
10 | 11 | ruma::{
|
11 | 12 | api::client::room::create_room::v3::{Request as CreateRoomRequest, RoomPreset},
|
@@ -128,3 +129,107 @@ async fn test_history_share_on_invite() -> Result<()> {
|
128 | 129 |
|
129 | 130 | Ok(())
|
130 | 131 | }
|
| 132 | + |
| 133 | +/// When a shared history bundle arrives after we joined we accept the bundle. |
| 134 | +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] |
| 135 | +async fn test_history_share_on_invite_out_of_order() -> Result<()> { |
| 136 | + let alice_span = tracing::info_span!("alice"); |
| 137 | + let bob_span = tracing::info_span!("bob"); |
| 138 | + |
| 139 | + let encryption_settings = |
| 140 | + EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() }; |
| 141 | + |
| 142 | + let alice = TestClientBuilder::new("alice") |
| 143 | + .encryption_settings(encryption_settings) |
| 144 | + .enable_share_history_on_invite(true) |
| 145 | + .build() |
| 146 | + .await?; |
| 147 | + |
| 148 | + let sync_service_span = tracing::info_span!(parent: &alice_span, "sync_service"); |
| 149 | + let alice_sync_service = SyncService::builder(alice.clone()) |
| 150 | + .with_parent_span(sync_service_span) |
| 151 | + .build() |
| 152 | + .await |
| 153 | + .expect("Could not build alice sync service"); |
| 154 | + |
| 155 | + alice.encryption().wait_for_e2ee_initialization_tasks().await; |
| 156 | + alice_sync_service.start().await; |
| 157 | + |
| 158 | + let bob = SyncTokenAwareClient::new( |
| 159 | + TestClientBuilder::new("bob") |
| 160 | + .encryption_settings(encryption_settings) |
| 161 | + .enable_share_history_on_invite(true) |
| 162 | + .build() |
| 163 | + .await?, |
| 164 | + ); |
| 165 | + |
| 166 | + // Alice creates a room ... |
| 167 | + let alice_room = alice |
| 168 | + .create_room(assign!(CreateRoomRequest::new(), { |
| 169 | + preset: Some(RoomPreset::PublicChat), |
| 170 | + })) |
| 171 | + .await?; |
| 172 | + alice_room.enable_encryption().await?; |
| 173 | + |
| 174 | + info!(room_id = ?alice_room.room_id(), "Alice has created and enabled encryption in the room"); |
| 175 | + |
| 176 | + // ... and sends a message |
| 177 | + let event_id = alice_room |
| 178 | + .send(RoomMessageEventContent::text_plain("Hello Bob")) |
| 179 | + .await |
| 180 | + .expect("We should be able to send a message to the room") |
| 181 | + .event_id; |
| 182 | + |
| 183 | + let bundle_stream = bob |
| 184 | + .encryption() |
| 185 | + .historic_room_key_stream() |
| 186 | + .await |
| 187 | + .expect("We should be able to get the bundle stream"); |
| 188 | + pin_mut!(bundle_stream); |
| 189 | + |
| 190 | + // Alice invites Bob to the room |
| 191 | + alice_room.invite_user_by_id(bob.user_id().unwrap()).await?; |
| 192 | + |
| 193 | + // Alice is done. Bob has been invited and the room key bundle should have been |
| 194 | + // sent out. Let's stop syncing so the logs contain less noise. |
| 195 | + alice_sync_service.stop().await; |
| 196 | + |
| 197 | + // Let's first join without syncing, this ensures that the bundle doesn't arrive |
| 198 | + // until after we're joined. |
| 199 | + let bob_room = bob.join_room_by_id(alice_room.room_id()).instrument(bob_span.clone()).await?; |
| 200 | + let event = bob_room.event(&event_id, None).instrument(bob_span.clone()).await?; |
| 201 | + |
| 202 | + assert_matches!( |
| 203 | + event.kind, |
| 204 | + TimelineEventKind::UnableToDecrypt { .. }, |
| 205 | + "We didn't yet sync the bundle, so this should be a UTD." |
| 206 | + ); |
| 207 | + assert!(bundle_stream.next().now_or_never().flatten().is_none()); |
| 208 | + |
| 209 | + // Ok, the event is a UTD and there's no info about the bundle on the stream. |
| 210 | + // Let's now sync to receive the bundle. |
| 211 | + bob.sync_once().instrument(bob_span.clone()).await?; |
| 212 | + |
| 213 | + let info = bundle_stream |
| 214 | + .next() |
| 215 | + .now_or_never() |
| 216 | + .flatten() |
| 217 | + .expect("We should be notified about the received bundle"); |
| 218 | + |
| 219 | + assert_eq!(Some(info.sender.deref()), alice.user_id()); |
| 220 | + assert_eq!(info.room_id, alice_room.room_id()); |
| 221 | + |
| 222 | + let event = bob_room |
| 223 | + .event(&event_id, None) |
| 224 | + .instrument(bob_span.clone()) |
| 225 | + .await |
| 226 | + .expect("Bob should be able to fetch the historic event"); |
| 227 | + |
| 228 | + assert_decrypted_message_eq!( |
| 229 | + event, |
| 230 | + "Hello Bob", |
| 231 | + "Now that we synced, we should have accepted the bundle and the event should be decrypted" |
| 232 | + ); |
| 233 | + |
| 234 | + Ok(()) |
| 235 | +} |
0 commit comments