@@ -371,25 +371,55 @@ async fn handle_core(
371
371
let mut active_execution_gc_interval = time:: interval ( Duration :: from_secs ( 30 ) ) ;
372
372
373
373
loop {
374
- tokio:: select! {
375
- request = socket. recv( ) => {
374
+ use Event :: * ;
375
+
376
+ enum Event {
377
+ Request ( Option < Result < Message , axum:: Error > > ) ,
378
+ Response ( Option < Result < MessageResponse , TaggedError > > ) ,
379
+ Task ( Result < Result < ( ) , TaggedError > , tokio:: task:: JoinError > ) ,
380
+ GarbageCollection ,
381
+ IdleTimeout ,
382
+ IdleRequest ,
383
+ SessionTimeout ,
384
+ }
385
+
386
+ let event = tokio:: select! {
387
+ request = socket. recv( ) => Request ( request) ,
388
+
389
+ resp = rx. recv( ) => Response ( resp) ,
390
+
391
+ // We don't care if there are no running tasks
392
+ Some ( task) = manager. join_next( ) => Task ( task) ,
393
+
394
+ _ = active_execution_gc_interval. tick( ) => GarbageCollection ,
395
+
396
+ _ = & mut idle_timeout, if manager. is_empty( ) => IdleTimeout ,
397
+
398
+ _ = factory. container_requested( ) , if manager. is_empty( ) => IdleRequest ,
399
+
400
+ _ = & mut session_timeout => SessionTimeout ,
401
+ } ;
402
+
403
+ match event {
404
+ Request ( request) => {
376
405
metrics:: WS_INCOMING . inc ( ) ;
377
406
378
407
match request {
379
- None => {
380
- // browser disconnected
381
- break ;
382
- }
383
- Some ( Ok ( Message :: Text ( txt) ) ) => handle_msg( & txt, & tx, & mut manager, & mut active_executions, & db) . await ,
384
- Some ( Ok ( _) ) => {
385
- // unknown message type
386
- continue ;
408
+ // browser disconnected
409
+ None => break ,
410
+
411
+ Some ( Ok ( Message :: Text ( txt) ) ) => {
412
+ handle_msg ( & txt, & tx, & mut manager, & mut active_executions, & db) . await
387
413
}
414
+
415
+ // unknown message type
416
+ Some ( Ok ( _) ) => continue ,
417
+
388
418
Some ( Err ( e) ) => super :: record_websocket_error ( e. to_string ( ) ) ,
389
419
}
390
- } ,
420
+ }
391
421
392
- resp = rx . recv ( ) => {
422
+ Response ( resp ) => {
393
423
let resp = resp. expect ( "The rx should never close as we have a tx" ) ;
394
424
395
425
let success = resp. is_ok ( ) ;
@@ -403,10 +433,9 @@ async fn handle_core(
403
433
404
434
let success = if success { "true" } else { "false" } ;
405
435
metrics:: WS_OUTGOING . with_label_values ( & [ success] ) . inc ( ) ;
406
- } ,
436
+ }
407
437
408
- // We don't care if there are no running tasks
409
- Some ( task) = manager. join_next( ) => {
438
+ Task ( task) => {
410
439
// The last task has completed which means we are a
411
440
// candidate for idling in a little while.
412
441
if manager. is_empty ( ) {
@@ -415,17 +444,21 @@ async fn handle_core(
415
444
416
445
let ( error, meta) = match task {
417
446
Ok ( Ok ( ( ) ) ) => continue ,
447
+
418
448
Ok ( Err ( error) ) => error,
449
+
419
450
Err ( error) => {
420
451
// The task was cancelled; no need to report
421
- let Ok ( panic) = error. try_into_panic( ) else { continue } ;
452
+ let Ok ( panic) = error. try_into_panic ( ) else {
453
+ continue ;
454
+ } ;
422
455
423
456
let text = match panic. downcast :: < String > ( ) {
424
457
Ok ( text) => * text,
425
458
Err ( panic) => match panic. downcast :: < & str > ( ) {
426
459
Ok ( text) => text. to_string ( ) ,
427
460
_ => "An unknown panic occurred" . into ( ) ,
428
- }
461
+ } ,
429
462
} ;
430
463
( WebSocketTaskPanicSnafu { text } . build ( ) , None )
431
464
}
@@ -435,32 +468,30 @@ async fn handle_core(
435
468
// We can't send a response
436
469
break ;
437
470
}
438
- } ,
471
+ }
439
472
440
- _ = active_execution_gc_interval . tick ( ) => {
473
+ GarbageCollection => {
441
474
active_executions = mem:: take ( & mut active_executions)
442
475
. into_iter ( )
443
476
. filter ( |( _id, ( _, tx) ) | tx. as_ref ( ) . is_some_and ( |tx| !tx. is_closed ( ) ) )
444
477
. collect ( ) ;
445
- } ,
478
+ }
446
479
447
- _ = & mut idle_timeout , if manager . is_empty ( ) => {
480
+ IdleTimeout => {
448
481
if handle_idle ( & mut manager, & tx) . await . is_break ( ) {
449
- break
482
+ break ;
450
483
}
451
- } ,
484
+ }
452
485
453
- _ = factory . container_requested ( ) , if manager . is_empty ( ) => {
486
+ IdleRequest => {
454
487
info ! ( "Container requested to idle" ) ;
455
488
456
489
if handle_idle ( & mut manager, & tx) . await . is_break ( ) {
457
- break
490
+ break ;
458
491
}
459
- } ,
460
-
461
- _ = & mut session_timeout => {
462
- break ;
463
492
}
493
+
494
+ SessionTimeout => break ,
464
495
}
465
496
}
466
497
0 commit comments