@@ -1831,7 +1831,7 @@ impl Container {
1831
1831
already_cancelled = true ;
1832
1832
1833
1833
let msg = CoordinatorMessage :: Kill ;
1834
- trace!( "processing { msg:?} ") ;
1834
+ trace!( msg_name = msg. as_ref ( ) , "processing ") ;
1835
1835
to_worker_tx. send( msg) . await . context( KillSnafu ) ?;
1836
1836
} ,
1837
1837
@@ -1847,12 +1847,12 @@ impl Container {
1847
1847
}
1848
1848
} ;
1849
1849
1850
- trace!( "processing { msg:?} ") ;
1850
+ trace!( msg_name = msg. as_ref ( ) , "processing ") ;
1851
1851
to_worker_tx. send( msg) . await . context( StdinSnafu ) ?;
1852
1852
} ,
1853
1853
1854
1854
Some ( container_msg) = from_worker_rx. recv( ) => {
1855
- trace!( "processing { container_msg:?} ") ;
1855
+ trace!( msg_name = container_msg. as_ref ( ) , "processing ") ;
1856
1856
1857
1857
match container_msg {
1858
1858
WorkerMessage :: ExecuteCommand ( resp) => {
@@ -2358,48 +2358,63 @@ impl Commander {
2358
2358
gc_interval. set_missed_tick_behavior ( MissedTickBehavior :: Delay ) ;
2359
2359
2360
2360
loop {
2361
- select ! {
2362
- command = command_rx. recv( ) => {
2363
- let Some ( ( ack_tx, command) ) = command else { break } ;
2361
+ enum Event {
2362
+ Command ( Option < ( oneshot:: Sender < ( ) > , DemultiplexCommand ) > ) ,
2364
2363
2364
+ FromWorker ( Option < Multiplexed < WorkerMessage > > ) ,
2365
+
2366
+ // Find any channels where the receivers have been
2367
+ // dropped and clear out the sending halves.
2368
+ Gc ,
2369
+ }
2370
+ use Event :: * ;
2371
+
2372
+ let event = select ! {
2373
+ command = command_rx. recv( ) => Command ( command) ,
2374
+
2375
+ msg = from_worker_rx. recv( ) => FromWorker ( msg) ,
2376
+
2377
+ _ = gc_interval. tick( ) => Gc ,
2378
+ } ;
2379
+
2380
+ match event {
2381
+ Command ( None ) => break ,
2382
+ Command ( Some ( ( ack_tx, command) ) ) => {
2365
2383
match command {
2366
2384
DemultiplexCommand :: Listen ( job_id, waiter) => {
2367
- trace!( "adding listener for {job_id:?} " ) ;
2385
+ trace ! ( job_id , "adding listener (many) " ) ;
2368
2386
let old = waiting. insert ( job_id, waiter) ;
2369
2387
ensure ! ( old. is_none( ) , DuplicateDemultiplexerClientSnafu { job_id } ) ;
2370
2388
}
2371
2389
2372
2390
DemultiplexCommand :: ListenOnce ( job_id, waiter) => {
2373
- trace!( "adding listener for {job_id:?} " ) ;
2391
+ trace ! ( job_id , "adding listener (once) " ) ;
2374
2392
let old = waiting_once. insert ( job_id, waiter) ;
2375
2393
ensure ! ( old. is_none( ) , DuplicateDemultiplexerClientSnafu { job_id } ) ;
2376
2394
}
2377
2395
}
2378
2396
2379
2397
ack_tx. send ( ( ) ) . ok ( /* Don't care about it */ ) ;
2380
- } ,
2381
-
2382
- msg = from_worker_rx. recv( ) => {
2383
- let Some ( Multiplexed ( job_id, msg) ) = msg else { break } ;
2398
+ }
2384
2399
2400
+ FromWorker ( None ) => break ,
2401
+ FromWorker ( Some ( Multiplexed ( job_id, msg) ) ) => {
2385
2402
if let Some ( waiter) = waiting_once. remove ( & job_id) {
2386
- trace!( "notifying listener for {job_id:?} " ) ;
2403
+ trace ! ( job_id , "notifying listener (once) " ) ;
2387
2404
waiter. send ( msg) . ok ( /* Don't care about it */ ) ;
2388
2405
continue ;
2389
2406
}
2390
2407
2391
2408
if let Some ( waiter) = waiting. get ( & job_id) {
2392
- trace!( "notifying listener for {job_id:?} " ) ;
2409
+ trace ! ( job_id , "notifying listener (many) " ) ;
2393
2410
waiter. send ( msg) . await . ok ( /* Don't care about it */ ) ;
2394
2411
continue ;
2395
2412
}
2396
2413
2397
- warn!( "no listener for {job_id:?} " ) ;
2414
+ warn ! ( job_id , "no listener to notify " ) ;
2398
2415
}
2399
2416
2400
- // Find any channels where the receivers have been
2401
- // dropped and clear out the sending halves.
2402
- _ = gc_interval. tick( ) => {
2417
+ Gc => {
2403
2418
waiting = mem:: take ( & mut waiting)
2404
2419
. into_iter ( )
2405
2420
. filter ( |( _job_id, tx) | !tx. is_closed ( ) )
0 commit comments