@@ -69,6 +69,7 @@ export class RunExecution {
69
69
70
70
private lastHeartbeat ?: Date ;
71
71
private isShuttingDown = false ;
72
+ private shutdownReason ?: string ;
72
73
73
74
constructor ( opts : RunExecutionOptions ) {
74
75
this . id = randomBytes ( 4 ) . toString ( "hex" ) ;
@@ -81,11 +82,39 @@ export class RunExecution {
81
82
this . executionAbortController = new AbortController ( ) ;
82
83
}
83
84
85
+ /**
86
+ * Cancels the current execution.
87
+ */
88
+ public async cancel ( ) : Promise < void > {
89
+ if ( this . isShuttingDown ) {
90
+ throw new Error ( "cancel called after execution shut down" ) ;
91
+ }
92
+
93
+ this . sendDebugLog ( "cancelling attempt" , { runId : this . runFriendlyId } ) ;
94
+
95
+ await this . taskRunProcess ?. cancel ( ) ;
96
+ }
97
+
98
+ /**
99
+ * Kills the current execution.
100
+ */
101
+ public async kill ( { exitExecution = true } : { exitExecution ?: boolean } = { } ) {
102
+ await this . taskRunProcess ?. kill ( "SIGKILL" ) ;
103
+
104
+ if ( exitExecution ) {
105
+ this . shutdown ( "kill" ) ;
106
+ }
107
+ }
108
+
84
109
/**
85
110
* Prepares the execution with task run environment variables.
86
111
* This should be called before executing, typically after a successful run to prepare for the next one.
87
112
*/
88
113
public prepareForExecution ( opts : RunExecutionPrepareOptions ) : this {
114
+ if ( this . isShuttingDown ) {
115
+ throw new Error ( "prepareForExecution called after execution shut down" ) ;
116
+ }
117
+
89
118
if ( this . taskRunProcess ) {
90
119
throw new Error ( "prepareForExecution called after process was already created" ) ;
91
120
}
@@ -204,7 +233,8 @@ export class RunExecution {
204
233
205
234
if ( this . currentAttemptNumber && this . currentAttemptNumber !== run . attemptNumber ) {
206
235
this . sendDebugLog ( "error: attempt number mismatch" , snapshotMetadata ) ;
207
- await this . taskRunProcess ?. suspend ( ) ;
236
+ // This is a rogue execution, a new one will already have been created elsewhere
237
+ await this . exitTaskRunProcessWithoutFailingRun ( { flush : false } ) ;
208
238
return ;
209
239
}
210
240
@@ -237,14 +267,14 @@ export class RunExecution {
237
267
this . sendDebugLog ( "run was re-queued" , snapshotMetadata ) ;
238
268
239
269
// Pretend we've just suspended the run. This will kill the process without failing the run.
240
- await this . taskRunProcess ?. suspend ( ) ;
270
+ await this . exitTaskRunProcessWithoutFailingRun ( { flush : true } ) ;
241
271
return ;
242
272
}
243
273
case "FINISHED" : {
244
274
this . sendDebugLog ( "run is finished" , snapshotMetadata ) ;
245
275
246
276
// Pretend we've just suspended the run. This will kill the process without failing the run.
247
- await this . taskRunProcess ?. suspend ( ) ;
277
+ await this . exitTaskRunProcessWithoutFailingRun ( { flush : true } ) ;
248
278
return ;
249
279
}
250
280
case "QUEUED_EXECUTING" :
@@ -255,11 +285,11 @@ export class RunExecution {
255
285
return ;
256
286
}
257
287
case "SUSPENDED" : {
258
- this . sendDebugLog ( "run was suspended, kill the process " , snapshotMetadata ) ;
288
+ this . sendDebugLog ( "run was suspended" , snapshotMetadata ) ;
259
289
260
290
// This will kill the process and fail the execution with a SuspendedProcessError
261
- await this . taskRunProcess ?. suspend ( ) ;
262
-
291
+ // We don't flush because we already did before suspending
292
+ await this . exitTaskRunProcessWithoutFailingRun ( { flush : false } ) ;
263
293
return ;
264
294
}
265
295
case "PENDING_EXECUTING" : {
@@ -384,6 +414,10 @@ export class RunExecution {
384
414
* When this returns, the child process will have been cleaned up.
385
415
*/
386
416
public async execute ( runOpts : RunExecutionRunOptions ) : Promise < void > {
417
+ if ( this . isShuttingDown ) {
418
+ throw new Error ( "execute called after execution shut down" ) ;
419
+ }
420
+
387
421
// Setup initial state
388
422
this . runFriendlyId = runOpts . runFriendlyId ;
389
423
@@ -420,7 +454,7 @@ export class RunExecution {
420
454
if ( startError ) {
421
455
this . sendDebugLog ( "failed to start attempt" , { error : startError . message } ) ;
422
456
423
- this . stopServices ( ) ;
457
+ this . shutdown ( "failed to start attempt" ) ;
424
458
return ;
425
459
}
426
460
@@ -429,11 +463,12 @@ export class RunExecution {
429
463
if ( executeError ) {
430
464
this . sendDebugLog ( "failed to execute run" , { error : executeError . message } ) ;
431
465
432
- this . stopServices ( ) ;
466
+ this . shutdown ( "failed to execute run" ) ;
433
467
return ;
434
468
}
435
469
436
- this . stopServices ( ) ;
470
+ // This is here for safety, but it
471
+ this . shutdown ( "execute call finished" ) ;
437
472
}
438
473
439
474
private async executeRunWrapper ( {
@@ -460,10 +495,7 @@ export class RunExecution {
460
495
} )
461
496
) ;
462
497
463
- this . sendDebugLog ( "run execution completed" , { error : executeError ?. message } ) ;
464
-
465
498
if ( ! executeError ) {
466
- this . stopServices ( ) ;
467
499
return ;
468
500
}
469
501
@@ -505,8 +537,6 @@ export class RunExecution {
505
537
if ( completeError ) {
506
538
this . sendDebugLog ( "failed to complete run" , { error : completeError . message } ) ;
507
539
}
508
-
509
- this . stopServices ( ) ;
510
540
}
511
541
512
542
private async executeRun ( {
@@ -527,7 +557,7 @@ export class RunExecution {
527
557
! this . taskRunProcess . isPreparedForNextAttempt
528
558
) {
529
559
this . sendDebugLog ( "killing existing task run process before executing next attempt" ) ;
530
- await this . kill ( ) . catch ( ( ) => { } ) ;
560
+ await this . kill ( { exitExecution : false } ) . catch ( ( ) => { } ) ;
531
561
}
532
562
533
563
// To skip this step and eagerly create the task run process, run prepareForExecution first
@@ -578,25 +608,6 @@ export class RunExecution {
578
608
}
579
609
}
580
610
581
- /**
582
- * Cancels the current execution.
583
- */
584
- public async cancel ( ) : Promise < void > {
585
- this . sendDebugLog ( "cancelling attempt" , { runId : this . runFriendlyId } ) ;
586
-
587
- await this . taskRunProcess ?. cancel ( ) ;
588
- }
589
-
590
- public exit ( ) {
591
- if ( this . taskRunProcess ?. isPreparedForNextRun ) {
592
- this . taskRunProcess ?. forceExit ( ) ;
593
- }
594
- }
595
-
596
- public async kill ( ) {
597
- await this . taskRunProcess ?. kill ( "SIGKILL" ) ;
598
- }
599
-
600
611
private async complete ( { completion } : { completion : TaskRunExecutionResult } ) : Promise < void > {
601
612
if ( ! this . runFriendlyId || ! this . snapshotManager ) {
602
613
throw new Error ( "cannot complete run: missing run or snapshot manager" ) ;
@@ -639,37 +650,32 @@ export class RunExecution {
639
650
640
651
const { attemptStatus } = result ;
641
652
642
- if ( attemptStatus === "RUN_FINISHED" ) {
643
- this . sendDebugLog ( "run finished" ) ;
644
-
645
- return ;
646
- }
647
-
648
- if ( attemptStatus === "RUN_PENDING_CANCEL" ) {
649
- this . sendDebugLog ( "run pending cancel" ) ;
650
- return ;
651
- }
653
+ switch ( attemptStatus ) {
654
+ case "RUN_FINISHED" :
655
+ case "RUN_PENDING_CANCEL" :
656
+ case "RETRY_QUEUED" : {
657
+ return ;
658
+ }
659
+ case "RETRY_IMMEDIATELY" : {
660
+ if ( attemptStatus !== "RETRY_IMMEDIATELY" ) {
661
+ return ;
662
+ }
652
663
653
- if ( attemptStatus === "RETRY_QUEUED" ) {
654
- this . sendDebugLog ( "retry queued" ) ;
664
+ if ( completion . ok ) {
665
+ throw new Error ( "Should retry but completion OK." ) ;
666
+ }
655
667
656
- return ;
657
- }
668
+ if ( ! completion . retry ) {
669
+ throw new Error ( "Should retry but missing retry params." ) ;
670
+ }
658
671
659
- if ( attemptStatus === "RETRY_IMMEDIATELY" ) {
660
- if ( completion . ok ) {
661
- throw new Error ( "Should retry but completion OK." ) ;
672
+ await this . retryImmediately ( { retryOpts : completion . retry } ) ;
673
+ return ;
662
674
}
663
-
664
- if ( ! completion . retry ) {
665
- throw new Error ( "Should retry but missing retry params." ) ;
675
+ default : {
676
+ assertExhaustive ( attemptStatus ) ;
666
677
}
667
-
668
- await this . retryImmediately ( { retryOpts : completion . retry } ) ;
669
- return ;
670
678
}
671
-
672
- assertExhaustive ( attemptStatus ) ;
673
679
}
674
680
675
681
private updateSnapshotAfterCompletion ( snapshotId : string , status : TaskRunExecutionStatus ) {
@@ -752,7 +758,7 @@ export class RunExecution {
752
758
if ( startError ) {
753
759
this . sendDebugLog ( "failed to start attempt for retry" , { error : startError . message } ) ;
754
760
755
- this . stopServices ( ) ;
761
+ this . shutdown ( "retryImmediately: failed to start attempt" ) ;
756
762
return ;
757
763
}
758
764
@@ -761,11 +767,9 @@ export class RunExecution {
761
767
if ( executeError ) {
762
768
this . sendDebugLog ( "failed to execute run for retry" , { error : executeError . message } ) ;
763
769
764
- this . stopServices ( ) ;
770
+ this . shutdown ( "retryImmediately: failed to execute run" ) ;
765
771
return ;
766
772
}
767
-
768
- this . stopServices ( ) ;
769
773
}
770
774
771
775
/**
@@ -797,10 +801,17 @@ export class RunExecution {
797
801
this . restoreCount ++ ;
798
802
}
799
803
804
+ private async exitTaskRunProcessWithoutFailingRun ( { flush } : { flush : boolean } ) {
805
+ await this . taskRunProcess ?. suspend ( { flush } ) ;
806
+
807
+ // No services should be left running after this line - let's make sure of it
808
+ this . shutdown ( "exitTaskRunProcessWithoutFailingRun" ) ;
809
+ }
810
+
800
811
/**
801
812
* Processes env overrides from the metadata service. Generally called when we're resuming from a suspended state.
802
813
*/
803
- async processEnvOverrides ( reason ?: string ) {
814
+ public async processEnvOverrides ( reason ?: string ) {
804
815
if ( ! this . env . TRIGGER_METADATA_URL ) {
805
816
this . sendDebugLog ( "no metadata url, skipping env overrides" , { reason } ) ;
806
817
return ;
@@ -953,15 +964,21 @@ export class RunExecution {
953
964
}
954
965
955
966
this . executionAbortController . abort ( ) ;
956
- this . stopServices ( ) ;
967
+ this . shutdown ( "abortExecution" ) ;
957
968
}
958
969
959
- private stopServices ( ) {
970
+ private shutdown ( reason : string ) {
960
971
if ( this . isShuttingDown ) {
972
+ this . sendDebugLog ( `[shutdown] ${ reason } (already shutting down)` , {
973
+ firstShutdownReason : this . shutdownReason ,
974
+ } ) ;
961
975
return ;
962
976
}
963
977
978
+ this . sendDebugLog ( `[shutdown] ${ reason } ` ) ;
979
+
964
980
this . isShuttingDown = true ;
981
+ this . shutdownReason = reason ;
965
982
966
983
this . snapshotPoller ?. stop ( ) ;
967
984
this . snapshotManager ?. cleanup ( ) ;
0 commit comments