@@ -17,7 +17,7 @@ use self::id_allocator::IdAllocator;
17
17
use anyhow:: Context as _;
18
18
use bytes:: Bytes ;
19
19
use jmux_proto:: { ChannelData , DistantChannelId , Header , LocalChannelId , Message , ReasonCode } ;
20
- use std:: collections:: HashMap ;
20
+ use std:: collections:: { HashMap , HashSet } ;
21
21
use std:: convert:: TryFrom ;
22
22
use std:: io;
23
23
use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
@@ -30,16 +30,6 @@ use tokio::task::JoinHandle;
30
30
use tokio_util:: codec:: FramedRead ;
31
31
use tracing:: { Instrument as _, Span } ;
32
32
33
- // PERF/FIXME: changing this parameter to 16 * 1024 greatly improves the throughput,
34
- // but we need to wait until 2025 before making this change.
35
- //
36
- // iperf result for 4 * 1024:
37
- // > 0.0000-10.0490 sec 23.0 GBytes 19.7 Gbits/sec
38
- //
39
- // iperf result for 16 * 1024:
40
- // > 0.0000-10.0393 sec 30.6 GBytes 26.2 Gbits/sec
41
- //
42
- // This is an improvement of ~32.9%.
43
33
const MAXIMUM_PACKET_SIZE_IN_BYTES : u16 = 4 * 1024 ; // 4 kiB
44
34
const WINDOW_ADJUSTMENT_THRESHOLD : u32 = 4 * 1024 ; // 4 kiB
45
35
@@ -323,14 +313,13 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
323
313
let mut jmux_ctx = JmuxCtx :: new ( ) ;
324
314
let mut data_senders: HashMap < LocalChannelId , DataSender > = HashMap :: new ( ) ;
325
315
let mut pending_channels: HashMap < LocalChannelId , ( DestinationUrl , ApiResponseSender ) > = HashMap :: new ( ) ;
316
+ let mut needs_window_adjustment: HashSet < LocalChannelId > = HashSet :: new ( ) ;
326
317
let ( internal_msg_tx, mut internal_msg_rx) = mpsc:: unbounded_channel :: < InternalMessage > ( ) ;
327
318
328
319
// Safety net against poor AsyncRead trait implementations.
329
320
const MAX_CONSECUTIVE_PIPE_FAILURES : u8 = 5 ;
330
321
let mut nb_consecutive_pipe_failures = 0 ;
331
322
332
- let mut needs_window_adjustment = false ;
333
-
334
323
loop {
335
324
// NOTE: Current task is the "jmux scheduler" or "jmux orchestrator".
336
325
// It handles the JMUX context and communicates with other tasks.
@@ -368,7 +357,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
368
357
if let Some ( leftover) = leftover {
369
358
if let Err ( error) = msg_to_send_tx. send( Message :: data( channel. distant_id, leftover) ) {
370
359
error!( %error, "Couldn't send leftover bytes" ) ;
371
- } ;
360
+ }
372
361
}
373
362
374
363
let ( reader, writer) = stream. into_split( ) ;
@@ -646,7 +635,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
646
635
647
636
let _ = data_tx. send( msg. transfer_data) ;
648
637
649
- needs_window_adjustment = true ;
638
+ needs_window_adjustment. insert ( id ) ;
650
639
}
651
640
Message :: Eof ( msg) => {
652
641
// Per the spec:
@@ -722,24 +711,22 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
722
711
}
723
712
}
724
713
}
725
- _ = core:: future:: ready( ( ) ) , if needs_window_adjustment => {
726
- for channel in jmux_ctx. channels. values_mut( ) {
714
+ _ = core:: future:: ready( ( ) ) , if !needs_window_adjustment. is_empty( ) => {
715
+ for channel_id in needs_window_adjustment. drain( ) {
716
+ let Some ( channel) = jmux_ctx. get_channel_mut( channel_id) else {
717
+ continue ;
718
+ } ;
719
+
727
720
let window_adjustment = channel. initial_window_size - channel. remote_window_size;
728
721
729
722
if window_adjustment > WINDOW_ADJUSTMENT_THRESHOLD {
730
- channel. span. in_scope( || {
731
- trace!( %channel. distant_id, "Send WindowAdjust message" ) ;
732
- } ) ;
733
-
734
723
msg_to_send_tx
735
724
. send( Message :: window_adjust( channel. distant_id, window_adjustment) )
736
725
. context( "couldn’t send WINDOW ADJUST message" ) ?;
737
726
738
727
channel. remote_window_size = channel. initial_window_size;
739
728
}
740
729
}
741
-
742
- needs_window_adjustment = false ;
743
730
}
744
731
}
745
732
}
@@ -820,7 +807,7 @@ impl DataReaderTask {
820
807
trace ! (
821
808
window_size_now,
822
809
chunk_length = chunk. len( ) ,
823
- "Window size insufficient to send full chunk. Truncate and wait. "
810
+ "Window size insufficient to send full chunk; truncate and wait"
824
811
) ;
825
812
826
813
if window_size_now > 0 {
0 commit comments