Skip to content

Commit 576a12d

Browse files
authored
feat: Add startTime to WorkflowInfo, and more (#1031)
1 parent aec16cd commit 576a12d

File tree

8 files changed

+92
-34
lines changed

8 files changed

+92
-34
lines changed

packages/test/src/integration-tests.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
185185
if (
186186
!history?.events?.some(
187187
({ workflowTaskFailedEventAttributes }) =>
188-
workflowTaskFailedEventAttributes?.failure?.message === "'not-found' is not a function"
188+
workflowTaskFailedEventAttributes?.failure?.message ===
189+
"Failed to initialize workflow of type 'not-found': no such function is exported by the workflow bundle"
189190
)
190191
) {
191192
throw new Error('Cannot find workflow task failed event');
@@ -646,7 +647,8 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
646647
CustomKeywordField: ['test-value'],
647648
CustomIntField: [1, 2],
648649
CustomDatetimeField: [date.toISOString(), date.toISOString()],
649-
datetimeInstanceofWorks: [false],
650+
datetimeInstanceofWorks: [true],
651+
arrayInstanceofWorks: [true],
650652
datetimeType: ['Date'],
651653
});
652654
});
@@ -712,6 +714,8 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
712714
workflowType: 'returnWorkflowInfo',
713715
workflowId,
714716
historyLength: 3,
717+
startTime: result.startTime,
718+
runStartTime: result.runStartTime,
715719
// unsafe.now is a function, so doesn't make it through serialization, but .now is required, so we need to cast
716720
unsafe: { isReplaying: false } as UnsafeWorkflowInfo,
717721
});

packages/test/src/test-sinks.ts

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,34 +33,43 @@ if (RUN_INTEGRATION_TESTS) {
3333
fn: string;
3434
}
3535

36+
const dummyDate = new Date(2000, 1, 0, 0, 0, 0);
37+
function fixWorkflowInfoDates(input: WorkflowInfo): WorkflowInfo {
38+
return {
39+
...input,
40+
startTime: dummyDate,
41+
runStartTime: dummyDate,
42+
};
43+
}
44+
3645
const recordedCalls: RecordedCall[] = [];
3746
const taskQueue = `${__filename}-${t.title}`;
3847
const thrownErrors = Array<DependencyError>();
3948
const sinks: InjectedSinks<workflows.TestSinks> = {
4049
success: {
4150
runAsync: {
4251
async fn(info, counter) {
43-
recordedCalls.push({ info, counter, fn: 'success.runAsync' });
52+
recordedCalls.push({ info: fixWorkflowInfoDates(info), counter, fn: 'success.runAsync' });
4453
},
4554
},
4655
runSync: {
4756
fn(info, counter) {
48-
recordedCalls.push({ info, counter, fn: 'success.runSync' });
57+
recordedCalls.push({ info: fixWorkflowInfoDates(info), counter, fn: 'success.runSync' });
4958
},
5059
},
5160
},
5261
error: {
5362
throwAsync: {
5463
async fn(info, counter) {
55-
recordedCalls.push({ info, counter, fn: 'error.throwAsync' });
64+
recordedCalls.push({ info: fixWorkflowInfoDates(info), counter, fn: 'error.throwAsync' });
5665
const error = new DependencyError('error', 'throwAsync');
5766
thrownErrors.push(error);
5867
throw error;
5968
},
6069
},
6170
throwSync: {
6271
fn(info, counter) {
63-
recordedCalls.push({ info, counter, fn: 'error.throwSync' });
72+
recordedCalls.push({ info: fixWorkflowInfoDates(info), counter, fn: 'error.throwSync' });
6473
const error = new DependencyError('error', 'throwSync');
6574
thrownErrors.push(error);
6675
throw error;
@@ -102,6 +111,8 @@ if (RUN_INTEGRATION_TESTS) {
102111
parent: undefined,
103112
searchAttributes: {},
104113
historyLength: 3,
114+
startTime: dummyDate,
115+
runStartTime: dummyDate,
105116
// unsafe.now() doesn't make it through serialization, but .now is required, so we need to cast
106117
unsafe: { isReplaying: false } as UnsafeWorkflowInfo,
107118
};
@@ -114,7 +125,13 @@ if (RUN_INTEGRATION_TESTS) {
114125
]);
115126

116127
t.deepEqual(
117-
recordedLogs,
128+
recordedLogs.map((x) => ({
129+
...x,
130+
meta: {
131+
...x.meta,
132+
workflowInfo: fixWorkflowInfoDates(x.meta.workflowInfo),
133+
},
134+
})),
118135
thrownErrors.map((error) => ({
119136
level: 'ERROR',
120137
message: 'External sink function threw an error',

packages/test/src/test-workflows.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ async function createWorkflow(
9494
searchAttributes: {},
9595
historyLength: 3,
9696
unsafe: { isReplaying: false, now: Date.now },
97+
startTime: new Date(),
98+
runStartTime: new Date(),
9799
},
98100
randomnessSeed: Long.fromInt(1337).toBytes(),
99101
now: startTime,

packages/test/src/workflows/return-search-attributes.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@ export async function returnSearchAttributes(): Promise<SearchAttributes | undef
77
...sa,
88
datetimeType: [Object.getPrototypeOf(datetime).constructor.name],
99
datetimeInstanceofWorks: [datetime instanceof Date],
10+
arrayInstanceofWorks: [sa.CustomIntField instanceof Array],
1011
};
1112
}

packages/worker/src/worker-options.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,8 +457,8 @@ export function appendDefaultInterceptors(
457457
}
458458

459459
export function addDefaultWorkerOptions(options: WorkerOptions): WorkerOptionsWithDefaults {
460-
const { maxCachedWorkflows, showStackTraceSources, debugMode, namespace, ...rest } = options;
461-
460+
const { maxCachedWorkflows, showStackTraceSources, namespace, ...rest } = options;
461+
const debugMode = options.debugMode || isSet(process.env.TEMPORAL_DEBUG);
462462
return {
463463
namespace: namespace ?? 'default',
464464
identity: `${process.pid}@${os.hostname()}`,
@@ -484,6 +484,12 @@ export function addDefaultWorkerOptions(options: WorkerOptions): WorkerOptionsWi
484484
};
485485
}
486486

487+
function isSet(env: string | undefined): boolean {
488+
if (env === undefined) return false;
489+
env = env.toLocaleLowerCase();
490+
return env === '1' || env === 't' || env === 'true';
491+
}
492+
487493
export function compileWorkerOptions(opts: WorkerOptionsWithDefaults): CompiledWorkerOptions {
488494
return {
489495
...opts,

packages/worker/src/worker.ts

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ import {
5050
TASK_TOKEN_ATTR_KEY,
5151
} from '@temporalio/common/lib/otel';
5252
import { historyFromJSON } from '@temporalio/common/lib/proto-utils';
53-
import { optionalTsToDate, optionalTsToMs, tsToMs } from '@temporalio/common/lib/time';
53+
import { optionalTsToDate, optionalTsToMs, tsToDate, tsToMs } from '@temporalio/common/lib/time';
5454
import { errorMessage } from '@temporalio/common/lib/type-helpers';
5555
import * as native from '@temporalio/core-bridge';
5656
import { UnexpectedError } from '@temporalio/core-bridge';
@@ -254,7 +254,7 @@ function formatTaskToken(taskToken: Uint8Array) {
254254
/**
255255
* Notify that an activity has started, used as input to {@link Worker.activityHeartbeatSubject}
256256
*
257-
* Used to detect rouge activities.
257+
* Used to detect rogue activities.
258258
*/
259259
interface HeartbeatCreateNotification {
260260
type: 'create';
@@ -575,7 +575,10 @@ export class Worker {
575575
}
576576
const workflowCreator = await this.createWorkflowCreator(bundle, compiledOptions);
577577
const replayHandle = await nativeWorkerCtor.createReplay(addBuildIdIfMissing(compiledOptions, bundle.code));
578-
return [new this(replayHandle.worker, workflowCreator, compiledOptions), replayHandle.historyPusher];
578+
return [
579+
new this(replayHandle.worker, workflowCreator, compiledOptions, undefined, true),
580+
replayHandle.historyPusher,
581+
];
579582
}
580583

581584
/**
@@ -680,7 +683,8 @@ export class Worker {
680683
*/
681684
protected readonly workflowCreator: WorkflowCreator | undefined,
682685
public readonly options: CompiledWorkerOptions,
683-
protected readonly connection?: NativeConnection
686+
protected readonly connection?: NativeConnection,
687+
protected readonly isReplayWorker: boolean = false
684688
) {
685689
this.tracer = getTracer(options.enableSDKTracing);
686690
this.workflowCodecRunner = new WorkflowCodecRunner(options.loadedDataConverter.payloadCodecs);
@@ -1133,15 +1137,16 @@ export class Worker {
11331137
if (
11341138
!(
11351139
startWorkflow &&
1136-
startWorkflow.workflowId &&
1137-
startWorkflow.workflowType &&
1138-
startWorkflow.randomnessSeed
1140+
startWorkflow.workflowId != null &&
1141+
startWorkflow.workflowType != null &&
1142+
startWorkflow.randomnessSeed != null &&
1143+
startWorkflow.firstExecutionRunId != null &&
1144+
startWorkflow.attempt != null &&
1145+
startWorkflow.startTime != null
11391146
)
11401147
) {
11411148
throw new TypeError(
1142-
`Expected StartWorkflow with workflowId, workflowType and randomnessSeed, got ${JSON.stringify(
1143-
maybeStartWorkflow
1144-
)}`
1149+
`Malformed StartWorkflow activation: ${JSON.stringify(maybeStartWorkflow)}`
11451150
);
11461151
}
11471152
if (activation.timestamp == null) {
@@ -1168,12 +1173,6 @@ export class Worker {
11681173
searchAttributes,
11691174
} = startWorkflow;
11701175

1171-
if (firstExecutionRunId === null || firstExecutionRunId === undefined) {
1172-
throw new TypeError(`Unexpected value: \`firstExecutionRunId\` is ${firstExecutionRunId}`);
1173-
}
1174-
if (attempt === null || attempt === undefined) {
1175-
throw new TypeError(`Unexpected value: \`attempt\` is ${attempt}`);
1176-
}
11771176
const workflowInfo: WorkflowInfo = {
11781177
workflowId,
11791178
runId: activation.runId,
@@ -1198,6 +1197,8 @@ export class Worker {
11981197
namespace: this.options.namespace,
11991198
firstExecutionRunId,
12001199
continuedFromExecutionRunId: continuedFromExecutionRunId || undefined,
1200+
startTime: tsToDate(startWorkflow.startTime),
1201+
runStartTime: tsToDate(activation.timestamp),
12011202
executionTimeoutMs: optionalTsToMs(workflowExecutionTimeout),
12021203
executionExpirationTime: optionalTsToDate(workflowExecutionExpirationTime),
12031204
runTimeoutMs: optionalTsToMs(workflowRunTimeout),
@@ -1624,7 +1625,7 @@ export class Worker {
16241625
protected activity$(): Observable<void> {
16251626
// This Worker did not register any activities, return early
16261627
if (this.options.activities === undefined || Object.keys(this.options.activities).length === 0) {
1627-
this.log.warn('No activities registered, not polling for activity tasks');
1628+
if (!this.isReplayWorker) this.log.warn('No activities registered, not polling for activity tasks');
16281629
this.activityPollerStateSubject.next('SHUTDOWN');
16291630
return EMPTY;
16301631
}

packages/workflow/src/interfaces.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,15 @@ export interface WorkflowInfo {
8181
*/
8282
continuedFromExecutionRunId?: string;
8383

84-
// TODO expose from Core
8584
/**
86-
* Time at which the Workflow Run started
85+
* Time at which this [Workflow Execution Chain](https://docs.temporal.io/workflows#workflow-execution-chain) was started
8786
*/
88-
// startTime: Date;
87+
startTime: Date;
88+
89+
/**
90+
* Time at which the current Workflow Run started
91+
*/
92+
runStartTime: Date;
8993

9094
/**
9195
* Milliseconds after which the Workflow Execution is automatically terminated by Temporal Server. Set via {@link WorkflowOptions.workflowExecutionTimeout}.

packages/workflow/src/worker-interface.ts

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import type { coresdk } from '@temporalio/proto';
1010
import { storage } from './cancellation-scope';
1111
import { DeterminismViolationError } from './errors';
1212
import { WorkflowInterceptorsFactory } from './interceptors';
13-
import { WorkflowCreateOptionsWithSourceMap } from './interfaces';
13+
import { WorkflowCreateOptionsWithSourceMap, WorkflowInfo } from './interfaces';
1414
import { Activator, getActivator } from './internals';
1515
import { SinkCall } from './sinks';
1616

@@ -93,9 +93,9 @@ export function overrideGlobals(): void {
9393
* Sets required internal state and instantiates the workflow and interceptors.
9494
*/
9595
export function initRuntime(options: WorkflowCreateOptionsWithSourceMap): void {
96-
const { info } = options;
96+
const info: WorkflowInfo = fixPrototypes(options.info);
9797
info.unsafe.now = OriginalDate.now;
98-
const activator = new Activator(options);
98+
const activator = new Activator({ ...options, info });
9999
// There's on activator per workflow instance, set it globally on the context.
100100
// We do this before importing any user code so user code can statically reference @temporalio/workflow functions
101101
// as well as Date and Math.random.
@@ -126,7 +126,7 @@ export function initRuntime(options: WorkflowCreateOptionsWithSourceMap): void {
126126
const factory: WorkflowInterceptorsFactory = mod.interceptors;
127127
if (factory !== undefined) {
128128
if (typeof factory !== 'function') {
129-
throw new TypeError(`interceptors must be a function, got: ${factory}`);
129+
throw new TypeError(`Failed to initialize workflows interceptors: expected a function, but got: '${factory}'`);
130130
}
131131
const interceptors = factory();
132132
activator.interceptors.inbound.push(...(interceptors.inbound ?? []));
@@ -138,11 +138,34 @@ export function initRuntime(options: WorkflowCreateOptionsWithSourceMap): void {
138138
const mod = importWorkflows();
139139
const workflow = mod[info.workflowType];
140140
if (typeof workflow !== 'function') {
141-
throw new TypeError(`'${info.workflowType}' is not a function`);
141+
const details =
142+
workflow === undefined
143+
? 'no such function is exported by the workflow bundle'
144+
: `expected a function, but got: '${typeof info.workflowType}'`;
145+
throw new TypeError(`Failed to initialize workflow of type '${info.workflowType}': ${details}`);
142146
}
143147
activator.workflow = workflow;
144148
}
145149

150+
/**
151+
* Objects transfered to the VM from outside have prototypes belonging to the
152+
* outer context, which means that instanceof won't work inside the VM. This
153+
* function recursively walks over the content of an object, and recreate some
154+
* of these objects (notably Array, Date and Objects).
155+
*/
156+
function fixPrototypes<X>(obj: X): X {
157+
if (obj != null && typeof obj === 'object') {
158+
switch (Object.getPrototypeOf(obj)?.constructor?.name) {
159+
case 'Array':
160+
return Array.from((obj as Array<unknown>).map(fixPrototypes)) as X;
161+
case 'Date':
162+
return new Date(obj as unknown as Date) as X;
163+
default:
164+
return Object.fromEntries(Object.entries(obj).map(([k, v]): [string, any] => [k, fixPrototypes(v)])) as X;
165+
}
166+
} else return obj;
167+
}
168+
146169
/**
147170
* Run a chunk of activation jobs
148171
* @returns a boolean indicating whether job was processed or ignored

0 commit comments

Comments
 (0)