@@ -492,11 +492,11 @@ export class Worker {
492
492
...compiledOptions ,
493
493
...( compiledOptions . workflowBundle && isCodeBundleOption ( compiledOptions . workflowBundle )
494
494
? {
495
- // Avoid dumping workflow bundle code to the console
496
- workflowBundle : < WorkflowBundle > {
497
- code : `<string of length ${ compiledOptions . workflowBundle . code . length } >` ,
498
- } ,
499
- }
495
+ // Avoid dumping workflow bundle code to the console
496
+ workflowBundle : < WorkflowBundle > {
497
+ code : `<string of length ${ compiledOptions . workflowBundle . code . length } >` ,
498
+ } ,
499
+ }
500
500
: { } ) ,
501
501
} ,
502
502
} ) ;
@@ -710,7 +710,7 @@ export class Worker {
710
710
) {
711
711
logger . warn (
712
712
'Ignoring WorkerOptions.interceptors.workflowModules because WorkerOptions.workflowBundle is set.\n' +
713
- 'To use workflow interceptors with a workflowBundle, pass them in the call to bundleWorkflowCode.'
713
+ 'To use workflow interceptors with a workflowBundle, pass them in the call to bundleWorkflowCode.'
714
714
) ;
715
715
}
716
716
@@ -883,8 +883,8 @@ export class Worker {
883
883
*/
884
884
protected pollLoop$ < T > ( pollFn : ( ) => Promise < T > ) : Observable < T > {
885
885
return from (
886
- ( async function * ( ) {
887
- for ( ; ; ) {
886
+ ( async function * ( ) {
887
+ for ( ; ; ) {
888
888
try {
889
889
yield await pollFn ( ) ;
890
890
} catch ( err ) {
@@ -919,14 +919,14 @@ export class Worker {
919
919
// so it can be cancelled if requested
920
920
let output :
921
921
| {
922
- type : 'result' ;
923
- result : coresdk . activity_result . IActivityExecutionResult ;
924
- }
922
+ type : 'result' ;
923
+ result : coresdk . activity_result . IActivityExecutionResult ;
924
+ }
925
925
| {
926
- type : 'run' ;
927
- activity : Activity ;
928
- input : ActivityExecuteInput ;
929
- }
926
+ type : 'run' ;
927
+ activity : Activity ;
928
+ input : ActivityExecuteInput ;
929
+ }
930
930
| { type : 'ignore' } ;
931
931
switch ( variant ) {
932
932
case 'start' : {
@@ -1112,38 +1112,44 @@ export class Worker {
1112
1112
*/
1113
1113
protected nexusOperator ( ) : OperatorFunction < NexusTaskWithBase64Token , Uint8Array > {
1114
1114
return pipe (
1115
- mergeMap ( async ( { task, base64TaskToken, protobufEncodedTask } ) : Promise < coresdk . nexus . INexusTaskCompletion | undefined > => {
1116
- const { variant } = task ;
1117
- if ( ! variant ) {
1118
- throw new TypeError ( 'Got a nexus task without a "variant" attribute' ) ;
1119
- }
1120
-
1121
- switch ( variant ) {
1122
- case 'task' : {
1123
- if ( task . task == null ) {
1124
- throw new IllegalStateError ( `Got empty task for task variant with token: ${ base64TaskToken } ` ) ;
1125
- }
1126
- return await this . handleNexusRunTask ( task . task , base64TaskToken , protobufEncodedTask ) ;
1115
+ mergeMap (
1116
+ async ( {
1117
+ task,
1118
+ base64TaskToken,
1119
+ protobufEncodedTask,
1120
+ } ) : Promise < coresdk . nexus . INexusTaskCompletion | undefined > => {
1121
+ const { variant } = task ;
1122
+ if ( ! variant ) {
1123
+ throw new TypeError ( 'Got a nexus task without a "variant" attribute' ) ;
1127
1124
}
1128
- case 'cancelTask' : {
1129
- const nexusHandler = this . taskTokenToNexusHandler . get ( base64TaskToken ) ;
1130
- if ( nexusHandler == null ) {
1131
- this . logger . trace ( 'Tried to cancel a non-existing nexus handler' , {
1132
- taskToken : base64TaskToken ,
1133
- } ) ;
1134
- break ;
1125
+
1126
+ switch ( variant ) {
1127
+ case 'task' : {
1128
+ if ( task . task == null ) {
1129
+ throw new IllegalStateError ( `Got empty task for task variant with token: ${ base64TaskToken } ` ) ;
1130
+ }
1131
+ return await this . handleNexusRunTask ( task . task , base64TaskToken , protobufEncodedTask ) ;
1135
1132
}
1136
- // NOTE: nexus handler will not be considered cancelled until it confirms cancellation (by throwing a CancelledFailure)
1137
- this . logger . trace ( 'Cancelling nexus handler' , nexusLogAttributes ( nexusHandler . info ) ) ;
1138
- let reason = 'unkown' ;
1139
- if ( task . cancelTask ?. reason != null ) {
1140
- reason = coresdk . nexus . NexusTaskCancelReason [ task . cancelTask . reason ] ;
1133
+ case 'cancelTask' : {
1134
+ const nexusHandler = this . taskTokenToNexusHandler . get ( base64TaskToken ) ;
1135
+ if ( nexusHandler == null ) {
1136
+ this . logger . trace ( 'Tried to cancel a non-existing nexus handler' , {
1137
+ taskToken : base64TaskToken ,
1138
+ } ) ;
1139
+ break ;
1140
+ }
1141
+ // NOTE: nexus handler will not be considered cancelled until it confirms cancellation (by throwing a CancelledFailure)
1142
+ this . logger . trace ( 'Cancelling nexus handler' , nexusLogAttributes ( nexusHandler . info ) ) ;
1143
+ let reason = 'unkown' ;
1144
+ if ( task . cancelTask ?. reason != null ) {
1145
+ reason = coresdk . nexus . NexusTaskCancelReason [ task . cancelTask . reason ] ;
1146
+ }
1147
+ nexusHandler . abortController . abort ( new CancelledFailure ( reason ) ) ;
1148
+ return { ackCancel : true , taskToken : task . cancelTask ?. taskToken } ;
1141
1149
}
1142
- nexusHandler . abortController . abort ( new CancelledFailure ( reason ) ) ;
1143
- return { ackCancel : true , taskToken : task . cancelTask ?. taskToken }
1144
1150
}
1145
1151
}
1146
- } ) ,
1152
+ ) ,
1147
1153
filter ( < T > ( result : T ) : result is Exclude < T , undefined > => result !== undefined ) ,
1148
1154
map ( ( result ) => coresdk . nexus . NexusTaskCompletion . encodeDelimited ( result ) . finish ( ) )
1149
1155
) ;
@@ -1278,9 +1284,9 @@ export class Worker {
1278
1284
const completion = synthetic
1279
1285
? undefined
1280
1286
: coresdk . workflow_completion . WorkflowActivationCompletion . encodeDelimited ( {
1281
- runId : activation . runId ,
1282
- successful : { } ,
1283
- } ) . finish ( ) ;
1287
+ runId : activation . runId ,
1288
+ successful : { } ,
1289
+ } ) . finish ( ) ;
1284
1290
return { state : undefined , output : { close, completion } } ;
1285
1291
}
1286
1292
0 commit comments