Skip to content

Commit e42fdb0

Browse files
authored
feat(workflow): Add workflowInfo().unsafe.now() (#882)
1 parent c2eed3a commit e42fdb0

File tree

9 files changed

+31
-18
lines changed

9 files changed

+31
-18
lines changed

packages/docs/docs/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ slug: /
55
---
66

77
- Temporal docs site: [`docs.temporal.io`](https://docs.temporal.io)
8-
- SDK docs: [`temporal.io/ts`](https://docs.temporal.io/typescript/introduction/)
8+
- SDK docs: [`t.mp/ts`](https://docs.temporal.io/typescript/introduction/)
99
- API reference: 👇
1010

1111
| Package | Description |

packages/test/src/integration-tests.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import { cleanOptionalStackTrace, u8 } from './helpers';
4242
import * as workflows from './workflows';
4343
import { withZeroesHTTPServer } from './zeroes-http-server';
4444
import { readFileSync } from 'node:fs';
45+
import { UnsafeWorkflowInfo } from '@temporalio/workflow/src/interfaces';
4546

4647
const { EVENT_TYPE_TIMER_STARTED, EVENT_TYPE_TIMER_FIRED, EVENT_TYPE_TIMER_CANCELED } =
4748
iface.temporal.api.enums.v1.EventType;
@@ -711,7 +712,8 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
711712
workflowType: 'returnWorkflowInfo',
712713
workflowId,
713714
historyLength: 3,
714-
unsafe: { isReplaying: false },
715+
// unsafe.now is a function, so doesn't make it through serialization, but .now is required, so we need to cast
716+
unsafe: { isReplaying: false } as UnsafeWorkflowInfo,
715717
});
716718
});
717719

packages/test/src/test-sinks.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import { WorkflowClient } from '@temporalio/client';
33
import { DefaultLogger, InjectedSinks, Runtime, Worker } from '@temporalio/worker';
44
import { WorkflowInfo } from '@temporalio/workflow';
5+
import { UnsafeWorkflowInfo } from '@temporalio/workflow/src/interfaces';
56
import test from 'ava';
67
import { v4 as uuid4 } from 'uuid';
78
import { RUN_INTEGRATION_TESTS } from './helpers';
@@ -101,7 +102,8 @@ if (RUN_INTEGRATION_TESTS) {
101102
parent: undefined,
102103
searchAttributes: {},
103104
historyLength: 3,
104-
unsafe: { isReplaying: false },
105+
// unsafe.now() doesn't make it through serialization, but .now is required, so we need to cast
106+
unsafe: { isReplaying: false } as UnsafeWorkflowInfo,
105107
};
106108

107109
t.deepEqual(recordedCalls, [

packages/test/src/test-workflows.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
/* eslint-disable @typescript-eslint/no-non-null-assertion */
12
import {
23
ApplicationFailure,
34
defaultPayloadConverter,
@@ -57,7 +58,6 @@ test.after.always(async (t) => {
5758

5859
test.beforeEach(async (t) => {
5960
const { workflowCreator } = t.context;
60-
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
6161
const workflowType = t.title.match(/\S+$/)![0];
6262
const runId = t.title;
6363
const logs = new Array<unknown[]>();
@@ -93,7 +93,7 @@ async function createWorkflow(
9393
taskQueue: 'test',
9494
searchAttributes: {},
9595
historyLength: 3,
96-
unsafe: { isReplaying: false },
96+
unsafe: { isReplaying: false, now: Date.now },
9797
},
9898
randomnessSeed: Long.fromInt(1337).toBytes(),
9999
now: startTime,
@@ -338,9 +338,7 @@ function cleanWorkflowFailureStackTrace(
338338
req: coresdk.workflow_completion.WorkflowActivationCompletion,
339339
commandIndex = 0
340340
) {
341-
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
342341
req.successful!.commands![commandIndex].failWorkflowExecution!.failure!.stackTrace = cleanStackTrace(
343-
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
344342
req.successful!.commands![commandIndex].failWorkflowExecution!.failure!.stackTrace!
345343
);
346344
return req;
@@ -350,9 +348,7 @@ function cleanWorkflowQueryFailureStackTrace(
350348
req: coresdk.workflow_completion.WorkflowActivationCompletion,
351349
commandIndex = 0
352350
) {
353-
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
354351
req.successful!.commands![commandIndex].respondToQuery!.failed!.stackTrace = cleanStackTrace(
355-
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
356352
req.successful!.commands![commandIndex].respondToQuery!.failed!.stackTrace!
357353
);
358354
return req;
@@ -943,7 +939,6 @@ test('handleExternalWorkflowCancellationWhileActivityRunning', async (t) => {
943939
const url = 'https://temporal.io';
944940
const data = { content: 'new HTML content' };
945941
{
946-
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
947942
const completion = await activate(
948943
t,
949944
makeStartWorkflow(workflowType, toPayloads(defaultPayloadConverter, url, data) ?? [])

packages/worker/src/worker-options.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,12 @@ export interface WorkerOptions {
156156
enableNonLocalActivities?: boolean;
157157

158158
/**
159-
* Limits the number of activities per second that this worker will process. The worker will
160-
* not poll for new activities if by doing so it might receive and execute an activity which
161-
* would cause it to exceed this limit. Must be a positive floating point number.
159+
* Limits the number of activities per second that this worker will process. The worker will not poll for new
160+
* activities if by doing so it might receive and execute an activity which would cause it to exceed this limit. Must
161+
* be a positive floating point number.
162162
*
163-
* If unset, no rate limiting will be applied to Worker's activities.
163+
* If unset, no rate limiting will be applied to Worker's activities. (`tctl task-queue describe` will display the
164+
* absence of a limit as 100,000.)
164165
*/
165166
maxActivitiesPerSecond?: number;
166167

packages/worker/src/worker.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1115,6 +1115,7 @@ export class Worker {
11151115
cronScheduleToScheduleInterval: optionalTsToMs(cronScheduleToScheduleInterval) || undefined,
11161116
historyLength: activation.historyLength,
11171117
unsafe: {
1118+
now: () => Date.now(), // re-set in initRuntime
11181119
isReplaying: activation.isReplaying,
11191120
},
11201121
};

packages/worker/src/workflow/threaded-vm.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@ export class VMWorkflowThreadProxy implements Workflow {
180180
workerThreadClient: WorkerThreadClient,
181181
options: WorkflowCreateOptions
182182
): Promise<VMWorkflowThreadProxy> {
183+
// Delete .now because functions can't be serialized / sent to thread.
184+
// Cast to any to avoid type error, since .now is a required field.
185+
// Safe to cast since we immediately set it inside the thread in initRuntime.
186+
delete (options.info.unsafe as any).now;
183187
await workerThreadClient.send({ type: 'create-workflow', options });
184188
return new this(workerThreadClient, options.info.runId);
185189
}

packages/workflow/src/interfaces.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,14 @@ export interface WorkflowInfo {
136136
* Never rely on this information in Workflow logic as it will cause non-deterministic behavior.
137137
*/
138138
export interface UnsafeWorkflowInfo {
139+
/**
140+
* Current system time in milliseconds
141+
*
142+
* The safe version of time is `new Date()` and `Date.now()`, which are set on the first invocation of a Workflow
143+
* Task.
144+
*/
145+
now(): number;
146+
139147
isReplaying: boolean;
140148
}
141149

packages/workflow/src/worker-interface.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,10 @@ export function setImportFuncs({ importWorkflows, importInterceptors }: ImportFu
3939
state.importInterceptors = importInterceptors;
4040
}
4141

42+
const global = globalThis as any;
43+
const OriginalDate = globalThis.Date;
44+
4245
export function overrideGlobals(): void {
43-
const global = globalThis as any;
4446
// Mock any weak reference because GC is non-deterministic and the effect is observable from the Workflow.
4547
// WeakRef is implemented in V8 8.4 which is embedded in node >=14.6.0.
4648
// Workflow developer will get a meaningful exception if they try to use these.
@@ -53,8 +55,6 @@ export function overrideGlobals(): void {
5355
);
5456
};
5557

56-
const OriginalDate = globalThis.Date;
57-
5858
global.Date = function (...args: unknown[]) {
5959
if (args.length > 0) {
6060
return new (OriginalDate as any)(...args);
@@ -120,7 +120,6 @@ export async function initRuntime({
120120
sourceMap,
121121
showStackTraceSources,
122122
}: WorkflowCreateOptionsWithSourceMap): Promise<void> {
123-
const global = globalThis as any;
124123
// Set the runId globally on the context so it can be retrieved in the case
125124
// of an unhandled promise rejection.
126125
global.__TEMPORAL__.runId = info.runId;
@@ -131,6 +130,7 @@ export async function initRuntime({
131130
};
132131

133132
state.info = info;
133+
state.info.unsafe.now = OriginalDate.now;
134134
state.now = now;
135135
state.random = alea(randomnessSeed);
136136
state.showStackTraceSources = showStackTraceSources;

0 commit comments

Comments
 (0)