@@ -14,7 +14,7 @@ use association_internal::*;
14
14
use association_stats:: * ;
15
15
use bytes:: { Bytes , BytesMut } ;
16
16
use rand:: random;
17
- use tokio:: sync:: { broadcast, mpsc, Mutex , Semaphore } ;
17
+ use tokio:: sync:: { broadcast, mpsc, Mutex } ;
18
18
use util:: Conn ;
19
19
20
20
use crate :: chunk:: chunk_abort:: ChunkAbort ;
@@ -487,18 +487,6 @@ impl Association {
487
487
let done = Arc :: new ( AtomicBool :: new ( false ) ) ;
488
488
let name = Arc :: new ( name) ;
489
489
490
- let limit = {
491
- #[ cfg( test) ]
492
- {
493
- 1
494
- }
495
- #[ cfg( not( test) ) ]
496
- {
497
- 8
498
- }
499
- } ;
500
-
501
- let sem = Arc :: new ( Semaphore :: new ( limit) ) ;
502
490
while !done. load ( Ordering :: Relaxed ) {
503
491
//log::debug!("[{}] gather_outbound begin", name);
504
492
let ( packets, continue_loop) = {
@@ -507,35 +495,43 @@ impl Association {
507
495
} ;
508
496
//log::debug!("[{}] gather_outbound done with {}", name, packets.len());
509
497
510
- // We schedule a new task here for a reason:
511
- // If we don't tokio tends to run the write_loop and read_loop of one connection on the same OS thread
512
- // This means that even though we release the lock above, the read_loop isn't able to take it, simply because it is not being scheduled by tokio
513
- // Doing it this way, tokio schedules this to a new thread, this future is suspended, and the read_loop can make progress
514
498
let net_conn = Arc :: clone ( & net_conn) ;
515
499
let bytes_sent = Arc :: clone ( & bytes_sent) ;
516
500
let name2 = Arc :: clone ( & name) ;
517
501
let done2 = Arc :: clone ( & done) ;
518
- let sem = Arc :: clone ( & sem) ;
519
- sem. acquire ( ) . await . unwrap ( ) . forget ( ) ;
520
- tokio:: task:: spawn ( async move {
521
- let mut buf = BytesMut :: with_capacity ( 16 * 1024 ) ;
522
- for raw in packets {
523
- buf. clear ( ) ;
524
- if let Err ( err) = raw. marshal_to ( & mut buf) {
525
- log:: warn!( "[{}] failed to serialize a packet: {:?}" , name2, err) ;
526
- } else {
502
+ let mut buffer = None ;
503
+ for raw in packets {
504
+ let mut buf = buffer
505
+ . take ( )
506
+ . unwrap_or_else ( || BytesMut :: with_capacity ( 16 * 1024 ) ) ;
507
+
508
+ // We do the marshalling work in a blocking task here for a reason:
509
+ // If we don't tokio tends to run the write_loop and read_loop of one connection on the same OS thread
510
+ // This means that even though we release the lock above, the read_loop isn't able to take it, simply because it is not being scheduled by tokio
511
+ // Doing it this way, tokio schedules this work on a dedicated blocking thread, this future is suspended, and the read_loop can make progress
512
+ match tokio:: task:: spawn_blocking ( move || raw. marshal_to ( & mut buf) . map ( |_| buf) )
513
+ . await
514
+ . unwrap ( )
515
+ {
516
+ Ok ( mut buf) => {
527
517
let raw = buf. as_ref ( ) ;
528
518
if let Err ( err) = net_conn. send ( raw. as_ref ( ) ) . await {
529
519
log:: warn!( "[{}] failed to write packets on net_conn: {}" , name2, err) ;
530
520
done2. store ( true , Ordering :: Relaxed )
531
521
} else {
532
522
bytes_sent. fetch_add ( raw. len ( ) , Ordering :: SeqCst ) ;
533
523
}
524
+
525
+ // Reuse allocation. Have to use options, since spawn blocking can't borrow, has to take ownership.
526
+ buf. clear ( ) ;
527
+ buffer = Some ( buf) ;
528
+ }
529
+ Err ( err) => {
530
+ log:: warn!( "[{}] failed to serialize a packet: {:?}" , name2, err) ;
534
531
}
535
- //log::debug!("[{}] sending {} bytes done", name, raw.len());
536
532
}
537
- sem . add_permits ( 1 ) ;
538
- } ) ;
533
+ //log::debug!("[{}] sending {} bytes done", name, raw.len() );
534
+ }
539
535
540
536
if !continue_loop {
541
537
break ;
0 commit comments