8
8
using Microsoft . AspNetCore . Connections ;
9
9
using Microsoft . AspNetCore . Connections . Features ;
10
10
using Microsoft . AspNetCore . Internal ;
11
- using Microsoft . AspNetCore . Server . Kestrel . Transport . Quic . Internal ;
12
11
using Microsoft . AspNetCore . InternalTesting ;
12
+ using Microsoft . AspNetCore . Server . Kestrel . Transport . Quic . Internal ;
13
13
using Microsoft . Extensions . Logging ;
14
14
using Microsoft . Extensions . Time . Testing ;
15
15
@@ -325,7 +325,7 @@ public async Task StreamPool_StreamAbortedOnServer_NotPooled()
325
325
var quicConnectionContext = Assert . IsType < QuicConnectionContext > ( serverConnection ) ;
326
326
Assert . Equal ( 0 , quicConnectionContext . StreamPool . Count ) ;
327
327
328
- var clientStream = await clientConnection . OpenOutboundStreamAsync ( QuicStreamType . Bidirectional ) ;
328
+ await using var clientStream = await clientConnection . OpenOutboundStreamAsync ( QuicStreamType . Bidirectional ) ;
329
329
await clientStream . WriteAsync ( TestData , completeWrites : true ) . DefaultTimeout ( ) ;
330
330
var serverStream = await serverConnection . AcceptAsync ( ) . DefaultTimeout ( ) ;
331
331
var readResult = await serverStream . Transport . Input . ReadAtLeastAsync ( TestData . Length ) . DefaultTimeout ( ) ;
@@ -368,7 +368,7 @@ public async Task StreamPool_StreamAbortedOnServerAfterComplete_NotPooled()
368
368
var quicConnectionContext = Assert . IsType < QuicConnectionContext > ( serverConnection ) ;
369
369
Assert . Equal ( 0 , quicConnectionContext . StreamPool . Count ) ;
370
370
371
- var clientStream = await clientConnection . OpenOutboundStreamAsync ( QuicStreamType . Bidirectional ) ;
371
+ await using var clientStream = await clientConnection . OpenOutboundStreamAsync ( QuicStreamType . Bidirectional ) ;
372
372
await clientStream . WriteAsync ( TestData , completeWrites : true ) . DefaultTimeout ( ) ;
373
373
var serverStream = await serverConnection . AcceptAsync ( ) . DefaultTimeout ( ) ;
374
374
var readResult = await serverStream . Transport . Input . ReadAtLeastAsync ( TestData . Length ) . DefaultTimeout ( ) ;
@@ -413,7 +413,7 @@ public async Task StreamPool_StreamAbortedOnClient_NotPooled()
413
413
var quicConnectionContext = Assert . IsType < QuicConnectionContext > ( serverConnection ) ;
414
414
Assert . Equal ( 0 , quicConnectionContext . StreamPool . Count ) ;
415
415
416
- var clientStream = await clientConnection . OpenOutboundStreamAsync ( QuicStreamType . Bidirectional ) ;
416
+ await using var clientStream = await clientConnection . OpenOutboundStreamAsync ( QuicStreamType . Bidirectional ) ;
417
417
await clientStream . WriteAsync ( TestData ) . DefaultTimeout ( ) ;
418
418
419
419
var serverStream = await serverConnection . AcceptAsync ( ) . DefaultTimeout ( ) ;
@@ -462,7 +462,7 @@ public async Task StreamPool_StreamAbortedOnClientAndServer_NotPooled()
462
462
var quicConnectionContext = Assert . IsType < QuicConnectionContext > ( serverConnection ) ;
463
463
Assert . Equal ( 0 , quicConnectionContext . StreamPool . Count ) ;
464
464
465
- var clientStream = await clientConnection . OpenOutboundStreamAsync ( QuicStreamType . Bidirectional ) ;
465
+ await using var clientStream = await clientConnection . OpenOutboundStreamAsync ( QuicStreamType . Bidirectional ) ;
466
466
await clientStream . WriteAsync ( TestData ) . DefaultTimeout ( ) ;
467
467
468
468
var serverStream = await serverConnection . AcceptAsync ( ) . DefaultTimeout ( ) ;
@@ -558,6 +558,8 @@ public async Task StreamPool_Heartbeat_ExpiredStreamRemoved()
558
558
public async Task StreamPool_ManyConcurrentStreams_StreamPoolFull ( )
559
559
{
560
560
// Arrange
561
+ using var httpEventSource = new HttpEventSourceListener ( LoggerFactory ) ;
562
+
561
563
await using var connectionListener = await QuicTestHelpers . CreateConnectionListenerFactory ( LoggerFactory ) ;
562
564
563
565
var options = QuicTestHelpers . CreateClientConnectionOptions ( connectionListener . EndPoint ) ;
@@ -580,52 +582,80 @@ public async Task StreamPool_ManyConcurrentStreams_StreamPoolFull()
580
582
const int StreamsSent = 101 ;
581
583
for ( var i = 0 ; i < StreamsSent ; i ++ )
582
584
{
583
- streamTasks . Add ( SendStream ( requestState ) ) ;
585
+ streamTasks . Add ( SendStream ( Logger , streamIndex : i , requestState ) ) ;
584
586
}
585
587
588
+ Logger . LogInformation ( "Waiting for all connections to be received by the server." ) ;
586
589
await allConnectionsOnServerTcs . Task . DefaultTimeout ( ) ;
587
590
pauseCompleteTcs . SetResult ( ) ;
588
591
592
+ Logger . LogInformation ( "Waiting for all stream tasks." ) ;
589
593
await Task . WhenAll ( streamTasks ) . DefaultTimeout ( ) ;
594
+ Logger . LogInformation ( "Stream tasks finished." ) ;
590
595
591
596
// Assert
592
597
// Up to 100 streams are pooled.
593
598
Assert . Equal ( 100 , quicConnectionContext . StreamPool . Count ) ;
594
599
595
- static async Task SendStream ( RequestState requestState )
600
+ static async Task SendStream ( ILogger logger , int streamIndex , RequestState requestState )
596
601
{
597
- var clientStream = await requestState . QuicConnection . OpenOutboundStreamAsync ( QuicStreamType . Bidirectional ) ;
598
- await clientStream . WriteAsync ( TestData , completeWrites : true ) . DefaultTimeout ( ) ;
599
- var serverStream = await requestState . ServerConnection . AcceptAsync ( ) . DefaultTimeout ( ) ;
600
- var readResult = await serverStream . Transport . Input . ReadAtLeastAsync ( TestData . Length ) . DefaultTimeout ( ) ;
601
- serverStream . Transport . Input . AdvanceTo ( readResult . Buffer . End ) ;
602
-
603
- // Input should be completed.
604
- readResult = await serverStream . Transport . Input . ReadAsync ( ) ;
605
- Assert . True ( readResult . IsCompleted ) ;
606
-
607
- lock ( requestState )
602
+ try
608
603
{
609
- requestState . ActiveConcurrentConnections ++ ;
610
- if ( requestState . ActiveConcurrentConnections == StreamsSent )
604
+ logger . LogInformation ( $ "{ StreamId ( streamIndex ) } : Client opening outbound stream.") ;
605
+ await using var clientStream = await requestState . QuicConnection . OpenOutboundStreamAsync ( QuicStreamType . Bidirectional ) ;
606
+ logger . LogInformation ( $ "{ StreamId ( streamIndex ) } : Client writing to stream.") ;
607
+ await clientStream . WriteAsync ( TestData , completeWrites : true ) . DefaultTimeout ( ) ;
608
+
609
+ logger . LogInformation ( $ "{ StreamId ( streamIndex ) } : Server accepting incoming stream.") ;
610
+ var serverStream = await requestState . ServerConnection . AcceptAsync ( ) . DefaultTimeout ( ) ;
611
+ logger . LogInformation ( $ "{ StreamId ( streamIndex ) } : Server reading data.") ;
612
+ var readResult = await serverStream . Transport . Input . ReadAtLeastAsync ( TestData . Length ) . DefaultTimeout ( ) ;
613
+ serverStream . Transport . Input . AdvanceTo ( readResult . Buffer . End ) ;
614
+
615
+ // Input should be completed.
616
+ logger . LogInformation ( $ "{ StreamId ( streamIndex ) } : Server verifying all data received.") ;
617
+ readResult = await serverStream . Transport . Input . ReadAsync ( ) ;
618
+ Assert . True ( readResult . IsCompleted ) ;
619
+
620
+ lock ( requestState )
611
621
{
612
- requestState . AllConnectionsOnServerTcs . SetResult ( ) ;
622
+ requestState . ActiveConcurrentConnections ++ ;
623
+
624
+ logger . LogInformation ( $ "{ StreamId ( streamIndex ) } : Increasing active concurrent connections to { requestState . ActiveConcurrentConnections } .") ;
625
+ if ( requestState . ActiveConcurrentConnections == StreamsSent )
626
+ {
627
+ logger . LogInformation ( $ "{ StreamId ( streamIndex ) } : All connections on server.") ;
628
+ requestState . AllConnectionsOnServerTcs . SetResult ( ) ;
629
+ }
613
630
}
614
- }
615
631
616
- await requestState . PauseCompleteTask ;
632
+ await requestState . PauseCompleteTask ;
617
633
618
- // Complete reading and writing.
619
- await serverStream . Transport . Input . CompleteAsync ( ) ;
620
- await serverStream . Transport . Output . CompleteAsync ( ) ;
634
+ // Complete reading and writing.
635
+ logger . LogInformation ( $ "{ StreamId ( streamIndex ) } : Server completing reading and writing.") ;
636
+ await serverStream . Transport . Input . CompleteAsync ( ) ;
637
+ await serverStream . Transport . Output . CompleteAsync ( ) ;
621
638
622
- var quicStreamContext = Assert . IsType < QuicStreamContext > ( serverStream ) ;
639
+ logger . LogInformation ( $ "{ StreamId ( streamIndex ) } : Client verifying all data received.") ;
640
+ var count = await clientStream . ReadAsync ( new byte [ 1024 ] ) ;
641
+ Assert . Equal ( 0 , count ) ;
623
642
624
- // Both send and receive loops have exited.
625
- await quicStreamContext . _processingTask . DefaultTimeout ( ) ;
626
- await quicStreamContext . DisposeAsync ( ) ;
627
- quicStreamContext . Dispose ( ) ;
643
+ logger . LogInformation ( $ "{ StreamId ( streamIndex ) } : Diposing { nameof ( QuicStreamContext ) } .") ;
644
+ var quicStreamContext = Assert . IsType < QuicStreamContext > ( serverStream ) ;
645
+
646
+ // Both send and receive loops have exited.
647
+ await quicStreamContext . _processingTask . DefaultTimeout ( ) ;
648
+ await quicStreamContext . DisposeAsync ( ) ;
649
+ quicStreamContext . Dispose ( ) ;
650
+ }
651
+ catch ( Exception ex )
652
+ {
653
+ logger . LogError ( ex , $ "{ StreamId ( streamIndex ) } : Error.") ;
654
+ throw ;
655
+ }
628
656
}
657
+
658
+ static string StreamId ( int index ) => $ "Stream-{ index } ";
629
659
}
630
660
631
661
[ ConditionalFact ]
@@ -644,7 +674,7 @@ public async Task PersistentState_StreamsReused_StatePersisted()
644
674
645
675
// Act
646
676
Logger . LogInformation ( "Client starting stream 1" ) ;
647
- var clientStream1 = await clientConnection . OpenOutboundStreamAsync ( QuicStreamType . Bidirectional ) ;
677
+ await using var clientStream1 = await clientConnection . OpenOutboundStreamAsync ( QuicStreamType . Bidirectional ) ;
648
678
await clientStream1 . WriteAsync ( TestData , completeWrites : true ) . DefaultTimeout ( ) ;
649
679
650
680
Logger . LogInformation ( "Server accept stream 1" ) ;
@@ -672,7 +702,7 @@ public async Task PersistentState_StreamsReused_StatePersisted()
672
702
quicStreamContext1 . Dispose ( ) ;
673
703
674
704
Logger . LogInformation ( "Client starting stream 2" ) ;
675
- var clientStream2 = await clientConnection . OpenOutboundStreamAsync ( QuicStreamType . Bidirectional ) ;
705
+ await using var clientStream2 = await clientConnection . OpenOutboundStreamAsync ( QuicStreamType . Bidirectional ) ;
676
706
await clientStream2 . WriteAsync ( TestData , completeWrites : true ) . DefaultTimeout ( ) ;
677
707
678
708
Logger . LogInformation ( "Server accept stream 2" ) ;
@@ -722,7 +752,7 @@ public async Task IProtocolErrorFeature_InvalidErrorCode(long errorCode)
722
752
await using var serverConnection = await connectionListener . AcceptAndAddFeatureAsync ( ) . DefaultTimeout ( ) ;
723
753
724
754
// Act
725
- var clientStream = await clientConnection . OpenOutboundStreamAsync ( QuicStreamType . Bidirectional ) ;
755
+ await using var clientStream = await clientConnection . OpenOutboundStreamAsync ( QuicStreamType . Bidirectional ) ;
726
756
await clientStream . WriteAsync ( TestData ) . DefaultTimeout ( ) ;
727
757
728
758
var serverStream = await serverConnection . AcceptAsync ( ) . DefaultTimeout ( ) ;
0 commit comments