@@ -194,17 +194,14 @@ fn wrap_room_member_events(
194
194
195
195
#[ cfg( test) ]
196
196
mod tests {
197
- use std:: {
198
- pin:: { pin, Pin } ,
199
- time:: Duration ,
200
- } ;
201
-
202
- use futures_core:: Stream ;
203
- use futures_util:: FutureExt ;
204
- use matrix_sdk_base:: crypto:: { IdentityState , IdentityStatusChange } ;
197
+ use std:: time:: Duration ;
198
+
199
+ use futures_util:: { pin_mut, FutureExt as _, StreamExt as _} ;
200
+ use matrix_sdk_base:: crypto:: IdentityState ;
205
201
use matrix_sdk_test:: { async_test, test_json:: keys_query_sets:: IdentityChangeDataSet } ;
206
202
use test_setup:: TestSetup ;
207
- use tokio_stream:: { StreamExt , Timeout } ;
203
+
204
+ use crate :: assert_next_with_timeout;
208
205
209
206
#[ async_test]
210
207
async fn test_when_user_becomes_unpinned_we_report_it ( ) {
@@ -215,13 +212,14 @@ mod tests {
215
212
t. pin_bob ( ) . await ;
216
213
217
214
// And we are listening for identity changes
218
- let changes = t. subscribe_to_identity_status_changes ( ) . await ;
215
+ let stream = t. subscribe_to_identity_status_changes ( ) . await ;
216
+ pin_mut ! ( stream) ;
219
217
220
218
// When Bob becomes unpinned
221
219
t. unpin_bob ( ) . await ;
222
220
223
221
// Then we were notified about it
224
- let change = next_change ( & mut pin ! ( changes ) ) . await ;
222
+ let change = assert_next_with_timeout ! ( stream ) ;
225
223
assert_eq ! ( change[ 0 ] . user_id, t. bob_user_id( ) ) ;
226
224
assert_eq ! ( change[ 0 ] . changed_to, IdentityState :: PinViolation ) ;
227
225
assert_eq ! ( change. len( ) , 1 ) ;
@@ -236,13 +234,14 @@ mod tests {
236
234
t. verify_bob ( ) . await ;
237
235
238
236
// And we are listening for identity changes
239
- let changes = t. subscribe_to_identity_status_changes ( ) . await ;
237
+ let stream = t. subscribe_to_identity_status_changes ( ) . await ;
238
+ pin_mut ! ( stream) ;
240
239
241
240
// When Bob's identity changes
242
241
t. unpin_bob ( ) . await ;
243
242
244
243
// Then we were notified about a verification violation
245
- let change = next_change ( & mut pin ! ( changes ) ) . await ;
244
+ let change = assert_next_with_timeout ! ( stream ) ;
246
245
assert_eq ! ( change[ 0 ] . user_id, t. bob_user_id( ) ) ;
247
246
assert_eq ! ( change[ 0 ] . changed_to, IdentityState :: VerificationViolation ) ;
248
247
assert_eq ! ( change. len( ) , 1 ) ;
@@ -257,20 +256,20 @@ mod tests {
257
256
t. unpin_bob ( ) . await ;
258
257
259
258
// And we are listening for identity changes
260
- let changes = t. subscribe_to_identity_status_changes ( ) . await ;
261
- let mut changes = pin ! ( changes ) ;
259
+ let stream = t. subscribe_to_identity_status_changes ( ) . await ;
260
+ pin_mut ! ( stream ) ;
262
261
263
262
// When Bob becomes pinned
264
263
t. pin_bob ( ) . await ;
265
264
266
265
// Then we were notified about the initial state of the room
267
- let change1 = next_change ( & mut changes ) . await ;
266
+ let change1 = assert_next_with_timeout ! ( stream ) ;
268
267
assert_eq ! ( change1[ 0 ] . user_id, t. bob_user_id( ) ) ;
269
268
assert_eq ! ( change1[ 0 ] . changed_to, IdentityState :: PinViolation ) ;
270
269
assert_eq ! ( change1. len( ) , 1 ) ;
271
270
272
271
// And the change when Bob became pinned
273
- let change2 = next_change ( & mut changes ) . await ;
272
+ let change2 = assert_next_with_timeout ! ( stream ) ;
274
273
assert_eq ! ( change2[ 0 ] . user_id, t. bob_user_id( ) ) ;
275
274
assert_eq ! ( change2[ 0 ] . changed_to, IdentityState :: Pinned ) ;
276
275
assert_eq ! ( change2. len( ) , 1 ) ;
@@ -282,8 +281,8 @@ mod tests {
282
281
let t = TestSetup :: new_room_with_other_bob ( ) . await ;
283
282
284
283
// And we are listening for identity changes
285
- let changes = t. subscribe_to_identity_status_changes ( ) . await ;
286
- let mut changes = pin ! ( changes ) ;
284
+ let stream = t. subscribe_to_identity_status_changes ( ) . await ;
285
+ pin_mut ! ( stream ) ;
287
286
288
287
// When Bob becomes verified
289
288
t. verify_bob ( ) . await ;
@@ -292,10 +291,10 @@ mod tests {
292
291
t. unpin_bob ( ) . await ;
293
292
294
293
// Then we are only notified about the unpinning part
295
- let change2 = next_change ( & mut changes ) . await ;
296
- assert_eq ! ( change2 [ 0 ] . user_id, t. bob_user_id( ) ) ;
297
- assert_eq ! ( change2 [ 0 ] . changed_to, IdentityState :: VerificationViolation ) ;
298
- assert_eq ! ( change2 . len( ) , 1 ) ;
294
+ let change = assert_next_with_timeout ! ( stream ) ;
295
+ assert_eq ! ( change [ 0 ] . user_id, t. bob_user_id( ) ) ;
296
+ assert_eq ! ( change [ 0 ] . changed_to, IdentityState :: VerificationViolation ) ;
297
+ assert_eq ! ( change . len( ) , 1 ) ;
299
298
}
300
299
301
300
#[ async_test]
@@ -307,20 +306,20 @@ mod tests {
307
306
t. unpin_bob_with ( IdentityChangeDataSet :: key_query_with_identity_a ( ) ) . await ;
308
307
309
308
// And we are listening for identity changes
310
- let changes = t. subscribe_to_identity_status_changes ( ) . await ;
311
- let mut changes = pin ! ( changes ) ;
309
+ let stream = t. subscribe_to_identity_status_changes ( ) . await ;
310
+ pin_mut ! ( stream ) ;
312
311
313
312
// When Bob becomes verified
314
313
t. verify_bob ( ) . await ;
315
314
316
315
// Then we were notified about the initial state of the room
317
- let change1 = next_change ( & mut changes ) . await ;
316
+ let change1 = assert_next_with_timeout ! ( stream ) ;
318
317
assert_eq ! ( change1[ 0 ] . user_id, t. bob_user_id( ) ) ;
319
318
assert_eq ! ( change1[ 0 ] . changed_to, IdentityState :: PinViolation ) ;
320
319
assert_eq ! ( change1. len( ) , 1 ) ;
321
320
322
321
// And the change when Bob became verified
323
- let change2 = next_change ( & mut changes ) . await ;
322
+ let change2 = assert_next_with_timeout ! ( stream ) ;
324
323
assert_eq ! ( change2[ 0 ] . user_id, t. bob_user_id( ) ) ;
325
324
assert_eq ! ( change2[ 0 ] . changed_to, IdentityState :: Verified ) ;
326
325
assert_eq ! ( change2. len( ) , 1 ) ;
@@ -341,20 +340,20 @@ mod tests {
341
340
t. unpin_bob ( ) . await ;
342
341
343
342
// And we are listening for identity changes
344
- let changes = t. subscribe_to_identity_status_changes ( ) . await ;
345
- let mut changes = pin ! ( changes ) ;
343
+ let stream = t. subscribe_to_identity_status_changes ( ) . await ;
344
+ pin_mut ! ( stream ) ;
346
345
347
346
// When Bob becomes verified
348
347
t. verify_bob ( ) . await ;
349
348
350
349
// Then we were notified about the initial state of the room
351
- let change1 = next_change ( & mut changes ) . await ;
350
+ let change1 = assert_next_with_timeout ! ( stream ) ;
352
351
assert_eq ! ( change1[ 0 ] . user_id, t. bob_user_id( ) ) ;
353
352
assert_eq ! ( change1[ 0 ] . changed_to, IdentityState :: VerificationViolation ) ;
354
353
assert_eq ! ( change1. len( ) , 1 ) ;
355
354
356
355
// And the change when Bob became verified
357
- let change2 = next_change ( & mut changes ) . await ;
356
+ let change2 = assert_next_with_timeout ! ( stream ) ;
358
357
assert_eq ! ( change2[ 0 ] . user_id, t. bob_user_id( ) ) ;
359
358
assert_eq ! ( change2[ 0 ] . changed_to, IdentityState :: Verified ) ;
360
359
assert_eq ! ( change2. len( ) , 1 ) ;
@@ -369,13 +368,14 @@ mod tests {
369
368
t. unpin_bob ( ) . await ;
370
369
371
370
// And we are listening for identity changes
372
- let changes = t. subscribe_to_identity_status_changes ( ) . await ;
371
+ let stream = t. subscribe_to_identity_status_changes ( ) . await ;
372
+ pin_mut ! ( stream) ;
373
373
374
374
// When Bob joins the room
375
375
t. bob_joins ( ) . await ;
376
376
377
377
// Then we were notified about it
378
- let change = next_change ( & mut pin ! ( changes ) ) . await ;
378
+ let change = assert_next_with_timeout ! ( stream ) ;
379
379
assert_eq ! ( change[ 0 ] . user_id, t. bob_user_id( ) ) ;
380
380
assert_eq ! ( change[ 0 ] . changed_to, IdentityState :: PinViolation ) ;
381
381
assert_eq ! ( change. len( ) , 1 ) ;
@@ -391,13 +391,14 @@ mod tests {
391
391
t. unpin_bob ( ) . await ;
392
392
393
393
// And we are listening for identity changes
394
- let changes = t. subscribe_to_identity_status_changes ( ) . await ;
394
+ let stream = t. subscribe_to_identity_status_changes ( ) . await ;
395
+ pin_mut ! ( stream) ;
395
396
396
397
// When Bob joins the room
397
398
t. bob_joins ( ) . await ;
398
399
399
400
// Then we were notified about it
400
- let change = next_change ( & mut pin ! ( changes ) ) . await ;
401
+ let change = assert_next_with_timeout ! ( stream ) ;
401
402
assert_eq ! ( change[ 0 ] . user_id, t. bob_user_id( ) ) ;
402
403
assert_eq ! ( change[ 0 ] . changed_to, IdentityState :: VerificationViolation ) ;
403
404
assert_eq ! ( change. len( ) , 1 ) ;
@@ -412,7 +413,8 @@ mod tests {
412
413
t. verify_bob ( ) . await ;
413
414
414
415
// And we are listening for identity changes
415
- let changes = t. subscribe_to_identity_status_changes ( ) . await ;
416
+ let stream = t. subscribe_to_identity_status_changes ( ) . await ;
417
+ pin_mut ! ( stream) ;
416
418
417
419
// When Bob joins the room
418
420
t. bob_joins ( ) . await ;
@@ -421,8 +423,7 @@ mod tests {
421
423
t. unpin_bob ( ) . await ;
422
424
423
425
//// Then we were only notified about the unpin
424
- let mut changes = pin ! ( changes) ;
425
- let change = next_change ( & mut changes) . await ;
426
+ let change = assert_next_with_timeout ! ( stream) ;
426
427
assert_eq ! ( change[ 0 ] . user_id, t. bob_user_id( ) ) ;
427
428
assert_eq ! ( change[ 0 ] . changed_to, IdentityState :: VerificationViolation ) ;
428
429
assert_eq ! ( change. len( ) , 1 ) ;
@@ -437,15 +438,15 @@ mod tests {
437
438
t. pin_bob ( ) . await ;
438
439
439
440
// And we are listening for identity changes
440
- let changes = t. subscribe_to_identity_status_changes ( ) . await ;
441
- let mut changes = pin ! ( changes ) ;
441
+ let stream = t. subscribe_to_identity_status_changes ( ) . await ;
442
+ pin_mut ! ( stream ) ;
442
443
443
444
// When Bob joins the room
444
445
t. bob_joins ( ) . await ;
445
446
446
447
// Then there is no notification
447
448
tokio:: time:: sleep ( Duration :: from_millis ( 200 ) ) . await ;
448
- let change = changes . next ( ) . now_or_never ( ) ;
449
+ let change = stream . next ( ) . now_or_never ( ) ;
449
450
assert ! ( change. is_none( ) ) ;
450
451
}
451
452
@@ -458,20 +459,20 @@ mod tests {
458
459
t. unpin_bob ( ) . await ;
459
460
460
461
// And we are listening for identity changes
461
- let changes = t. subscribe_to_identity_status_changes ( ) . await ;
462
- let mut changes = pin ! ( changes ) ;
462
+ let stream = t. subscribe_to_identity_status_changes ( ) . await ;
463
+ pin_mut ! ( stream ) ;
463
464
464
465
// When Bob leaves the room
465
466
t. bob_leaves ( ) . await ;
466
467
467
468
// Then we were notified about the initial state of the room
468
- let change1 = next_change ( & mut changes ) . await ;
469
+ let change1 = assert_next_with_timeout ! ( stream ) ;
469
470
assert_eq ! ( change1[ 0 ] . user_id, t. bob_user_id( ) ) ;
470
471
assert_eq ! ( change1[ 0 ] . changed_to, IdentityState :: PinViolation ) ;
471
472
assert_eq ! ( change1. len( ) , 1 ) ;
472
473
473
474
// And we were notified about the change when the user left
474
- let change2 = next_change ( & mut changes ) . await ;
475
+ let change2 = assert_next_with_timeout ! ( stream ) ;
475
476
// Note: the user left the room, but we see that as them "becoming pinned" i.e.
476
477
// "you no longer need to notify about this user".
477
478
assert_eq ! ( change2[ 0 ] . user_id, t. bob_user_id( ) ) ;
@@ -488,36 +489,35 @@ mod tests {
488
489
t. unpin_bob ( ) . await ;
489
490
490
491
// And we are listening for identity changes
491
- let changes = t. subscribe_to_identity_status_changes ( ) . await ;
492
- let mut changes = pin ! ( changes ) ;
492
+ let stream = t. subscribe_to_identity_status_changes ( ) . await ;
493
+ pin_mut ! ( stream ) ;
493
494
494
495
// NOTE: below we pull the changes out of the subscription after each action.
495
- // This makes sure that the identity changes and membership changes are
496
- // properly ordered. If we pull them out later, the identity changes get
497
- // shifted forward because they rely on less-complex async stuff under
498
- // the hood. Calling next_change ends up winding the async
499
- // machinery sufficiently that the membership change and any subsequent events
500
- // have fully completed.
496
+ // This makes sure that the identity changes and membership changes are properly
497
+ // ordered. If we pull them out later, the identity changes get shifted forward
498
+ // because they rely on less-complex async stuff under the hood. Calling
499
+ // next_change ends up winding the async machinery sufficiently that the
500
+ // membership change and any subsequent events have fully completed.
501
501
502
502
// When Bob joins the room ...
503
503
t. bob_joins ( ) . await ;
504
- let change1 = next_change ( & mut changes ) . await ;
504
+ let change1 = assert_next_with_timeout ! ( stream ) ;
505
505
506
506
// ... becomes pinned ...
507
507
t. pin_bob ( ) . await ;
508
- let change2 = next_change ( & mut changes ) . await ;
508
+ let change2 = assert_next_with_timeout ! ( stream ) ;
509
509
510
510
// ... leaves and joins again (ignored since they stay pinned) ...
511
511
t. bob_leaves ( ) . await ;
512
512
t. bob_joins ( ) . await ;
513
513
514
514
// ... becomes unpinned ...
515
515
t. unpin_bob ( ) . await ;
516
- let change3 = next_change ( & mut changes ) . await ;
516
+ let change3 = assert_next_with_timeout ! ( stream ) ;
517
517
518
518
// ... and leaves.
519
519
t. bob_leaves ( ) . await ;
520
- let change4 = next_change ( & mut changes ) . await ;
520
+ let change4 = assert_next_with_timeout ! ( stream ) ;
521
521
522
522
assert_eq ! ( change1[ 0 ] . user_id, t. bob_user_id( ) ) ;
523
523
assert_eq ! ( change2[ 0 ] . user_id, t. bob_user_id( ) ) ;
@@ -542,10 +542,11 @@ mod tests {
542
542
t. unpin_bob ( ) . await ;
543
543
544
544
// When we start listening for identity changes
545
- let changes = t. subscribe_to_identity_status_changes ( ) . await ;
545
+ let stream = t. subscribe_to_identity_status_changes ( ) . await ;
546
+ pin_mut ! ( stream) ;
546
547
547
548
// Then we were immediately notified about Bob being unpinned
548
- let change = next_change ( & mut pin ! ( changes ) ) . await ;
549
+ let change = assert_next_with_timeout ! ( stream ) ;
549
550
assert_eq ! ( change[ 0 ] . user_id, t. bob_user_id( ) ) ;
550
551
assert_eq ! ( change[ 0 ] . changed_to, IdentityState :: PinViolation ) ;
551
552
assert_eq ! ( change. len( ) , 1 ) ;
@@ -558,34 +559,26 @@ mod tests {
558
559
t. verify_bob ( ) . await ;
559
560
560
561
// When we start listening for identity changes
561
- let changes = t. subscribe_to_identity_status_changes ( ) . await ;
562
+ let stream = t. subscribe_to_identity_status_changes ( ) . await ;
563
+ pin_mut ! ( stream) ;
562
564
563
565
// (And we unpin so that something is available in the changes stream)
564
566
t. unpin_bob ( ) . await ;
565
567
566
568
// Then we were only notified about the unpin, not being verified
567
- let change = next_change ( & mut pin ! ( changes) ) . await ;
568
- assert_eq ! ( change[ 0 ] . user_id, t. bob_user_id( ) ) ;
569
- assert_eq ! ( change[ 0 ] . changed_to, IdentityState :: VerificationViolation ) ;
570
- assert_eq ! ( change. len( ) , 1 ) ;
569
+ let next_change = assert_next_with_timeout ! ( stream) ;
570
+
571
+ assert_eq ! ( next_change[ 0 ] . user_id, t. bob_user_id( ) ) ;
572
+ assert_eq ! ( next_change[ 0 ] . changed_to, IdentityState :: VerificationViolation ) ;
573
+ assert_eq ! ( next_change. len( ) , 1 ) ;
571
574
}
572
575
573
576
// TODO: I (andyb) haven't figured out how to test room membership changes that
574
577
// affect our own user (they should not be shown). Specifically, I haven't
575
578
// figure out how to get out own user into a non-pinned state.
576
579
577
- async fn next_change (
578
- changes : & mut Pin < & mut Timeout < impl Stream < Item = Vec < IdentityStatusChange > > > > ,
579
- ) -> Vec < IdentityStatusChange > {
580
- changes
581
- . next ( )
582
- . await
583
- . expect ( "Should not reach end of changes stream" )
584
- . expect ( "Should not time out waiting for a change" )
585
- }
586
-
587
580
mod test_setup {
588
- use std:: time:: { Duration , SystemTime , UNIX_EPOCH } ;
581
+ use std:: time:: { SystemTime , UNIX_EPOCH } ;
589
582
590
583
use futures_core:: Stream ;
591
584
use matrix_sdk_base:: {
@@ -605,7 +598,6 @@ mod tests {
605
598
owned_user_id, OwnedUserId , TransactionId , UserId ,
606
599
} ;
607
600
use serde_json:: json;
608
- use tokio_stream:: { StreamExt as _, Timeout } ;
609
601
use wiremock:: {
610
602
matchers:: { header, method, path_regex} ,
611
603
Mock , MockServer , ResponseTemplate ,
@@ -786,12 +778,11 @@ mod tests {
786
778
787
779
pub ( super ) async fn subscribe_to_identity_status_changes (
788
780
& self ,
789
- ) -> Timeout < impl Stream < Item = Vec < IdentityStatusChange > > > {
781
+ ) -> impl Stream < Item = Vec < IdentityStatusChange > > {
790
782
self . room
791
783
. subscribe_to_identity_status_changes ( )
792
784
. await
793
785
. expect ( "Should be able to subscribe" )
794
- . timeout ( Duration :: from_secs ( 5 ) )
795
786
}
796
787
797
788
async fn init ( ) -> ( Client , OwnedUserId , SyncResponseBuilder ) {
0 commit comments