Skip to content

Commit fa852b8

Browse files
authored
feat: Support history from JSON (#743)
1 parent 027a052 commit fa852b8

File tree

12 files changed

+406
-26
lines changed

12 files changed

+406
-26
lines changed

packages/common/src/converter/patch-protobuf-root.ts

Lines changed: 0 additions & 1 deletion
This file was deleted.

packages/common/src/proto-utils.ts

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import proto from '@temporalio/proto';
2+
import { fromProto3JSON, toProto3JSON } from 'proto3-json-serializer';
3+
import { patchProtobufRoot } from '@temporalio/proto/lib/patch-protobuf-root';
4+
5+
export type History = proto.temporal.api.history.v1.IHistory;
6+
export type Payload = proto.temporal.api.common.v1.IPayload;
7+
8+
// Cast to any because the generated proto module types are missing the lookupType method
9+
const patched = patchProtobufRoot(proto) as any;
10+
const historyType = patched.lookupType('temporal.api.history.v1.History');
11+
const payloadType = patched.lookupType('temporal.api.common.v1.Payload');
12+
13+
function pascalCaseToConstantCase(s: string) {
14+
return s.replace(/[^\b][A-Z]/g, (m) => `${m[0]}_${m[1]}`).toUpperCase();
15+
}
16+
17+
function fixEnumValue<O extends Record<string, any>>(obj: O, attr: keyof O, prefix: string) {
18+
return (
19+
obj[attr] && {
20+
[attr]: `${prefix}_${pascalCaseToConstantCase(obj[attr])}`,
21+
}
22+
);
23+
}
24+
25+
function fixHistoryEvent(e: Record<string, any>) {
26+
const type = Object.keys(e).find((k) => k.endsWith('EventAttributes'));
27+
if (!type) {
28+
throw new TypeError(`Missing attributes in history event: ${JSON.stringify(e)}`);
29+
}
30+
31+
return {
32+
...e,
33+
...fixEnumValue(e, 'eventType', 'EVENT_TYPE'),
34+
[type]: {
35+
...e[type],
36+
...(e[type].taskQueue && {
37+
taskQueue: { ...e[type].taskQueue, ...fixEnumValue(e[type].taskQueue, 'kind', 'TASK_QUEUE_KIND') },
38+
}),
39+
...fixEnumValue(e[type], 'parentClosePolicy', 'PARENT_CLOSE_POLICY'),
40+
...fixEnumValue(e[type], 'workflowIdReusePolicy', 'WORKFLOW_ID_REUSE_POLICY'),
41+
...fixEnumValue(e[type], 'initiator', 'CONTINUE_AS_NEW_INITIATOR'),
42+
...fixEnumValue(e[type], 'retryState', 'RETRY_STATE'),
43+
...(e[type].childWorkflowExecutionFailureInfo && {
44+
childWorkflowExecutionFailureInfo: {
45+
...e[type].childWorkflowExecutionFailureInfo,
46+
...fixEnumValue(e[type].childWorkflowExecutionFailureInfo, 'retryState', 'RETRY_STATE'),
47+
},
48+
}),
49+
},
50+
};
51+
}
52+
53+
function fixHistory(h: Record<string, any>) {
54+
return {
55+
events: h.events.map(fixHistoryEvent),
56+
};
57+
}
58+
59+
/**
60+
* Convert a proto JSON representation of History to a valid History object
61+
*/
62+
export function historyFromJSON(history: unknown): History {
63+
if (typeof history !== 'object' || history == null || !Array.isArray((history as any).events)) {
64+
throw new TypeError('Invalid history, expected an object with an array of events');
65+
}
66+
const loaded = fromProto3JSON(historyType, fixHistory(history));
67+
if (loaded === null) {
68+
throw new TypeError('Invalid history');
69+
}
70+
return loaded as any;
71+
}
72+
73+
/**
74+
* JSON representation of Temporal's {@link Payload} protobuf object
75+
*/
76+
export interface JSONPayload {
77+
/**
78+
* Mapping of key to base64 encoded value
79+
*/
80+
metadata?: Record<string, string> | null;
81+
/**
82+
* base64 encoded value
83+
*/
84+
data?: string | null;
85+
}
86+
87+
/**
88+
* Convert from protobuf payload to JSON
89+
*/
90+
export function payloadToJSON(payload: Payload): JSONPayload {
91+
return toProto3JSON(patched.temporal.api.common.v1.Payload.create(payload)) as any;
92+
}
93+
94+
/**
95+
* Convert from JSON to protobuf payload
96+
*/
97+
export function JSONToPayload(json: JSONPayload): Payload {
98+
const loaded = fromProto3JSON(payloadType, json as any);
99+
if (loaded === null) {
100+
throw new TypeError('Invalid payload');
101+
}
102+
return loaded as any;
103+
}

packages/common/src/protobufs.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@
1212

1313
// Don't export from index, so we save space in Workflow bundles of users who don't use Protobufs
1414
export * from './converter/protobuf-payload-converters';
15-
export * from './converter/patch-protobuf-root';
15+
export { patchProtobufRoot } from '@temporalio/proto/lib/patch-protobuf-root';
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
{
2+
"events": [
3+
{
4+
"eventId": "1",
5+
"eventTime": "2022-07-06T00:33:05.000Z",
6+
"eventType": "WorkflowExecutionStarted",
7+
"version": "0",
8+
"taskId": "1056764",
9+
"workflowExecutionStartedEventAttributes": {
10+
"workflowType": { "name": "cancelFakeProgress" },
11+
"parentWorkflowNamespace": "",
12+
"parentInitiatedEventId": "0",
13+
"taskQueue": { "name": "test", "kind": "Normal" },
14+
"input": { "payloads": [] },
15+
"workflowTaskTimeout": "10s",
16+
"continuedExecutionRunId": "",
17+
"initiator": "Unspecified",
18+
"originalExecutionRunId": "bc761765-7fca-4d3e-89ff-0fa49379dc7a",
19+
"identity": "61356@Roey-Bermans-MacBook-Pro.local",
20+
"firstExecutionRunId": "bc761765-7fca-4d3e-89ff-0fa49379dc7a",
21+
"attempt": 1,
22+
"cronSchedule": "",
23+
"firstWorkflowTaskBackoff": "0s",
24+
"header": { "fields": {} }
25+
}
26+
},
27+
{
28+
"eventId": "2",
29+
"eventTime": "2022-07-06T00:33:05.000Z",
30+
"eventType": "WorkflowTaskScheduled",
31+
"version": "0",
32+
"taskId": "1056765",
33+
"workflowTaskScheduledEventAttributes": {
34+
"taskQueue": { "name": "test", "kind": "Normal" },
35+
"startToCloseTimeout": "10s",
36+
"attempt": 1
37+
}
38+
},
39+
{
40+
"eventId": "3",
41+
"eventTime": "2022-07-06T00:33:16.000Z",
42+
"eventType": "WorkflowTaskStarted",
43+
"version": "0",
44+
"taskId": "1057182",
45+
"workflowTaskStartedEventAttributes": {
46+
"scheduledEventId": "2",
47+
"identity": "61356@Roey-Bermans-MacBook-Pro.local",
48+
"requestId": "44b458e4-c800-4335-83d0-e59704cdf3bc"
49+
}
50+
},
51+
{
52+
"eventId": "4",
53+
"eventTime": "2022-07-06T00:33:16.000Z",
54+
"eventType": "WorkflowTaskCompleted",
55+
"version": "0",
56+
"taskId": "1057190",
57+
"workflowTaskCompletedEventAttributes": {
58+
"scheduledEventId": "2",
59+
"startedEventId": "3",
60+
"identity": "61356@Roey-Bermans-MacBook-Pro.local",
61+
"binaryChecksum": "@temporalio/worker@1.0.0-rc.0"
62+
}
63+
},
64+
{
65+
"eventId": "5",
66+
"eventTime": "2022-07-06T00:33:16.000Z",
67+
"eventType": "ActivityTaskScheduled",
68+
"version": "0",
69+
"taskId": "1057191",
70+
"activityTaskScheduledEventAttributes": {
71+
"activityId": "1",
72+
"activityType": { "name": "fakeProgress" },
73+
"namespace": "",
74+
"taskQueue": { "name": "test", "kind": "Normal" },
75+
"header": { "fields": {} },
76+
"scheduleToCloseTimeout": "0s",
77+
"scheduleToStartTimeout": "0s",
78+
"startToCloseTimeout": "200s",
79+
"heartbeatTimeout": "0s",
80+
"workflowTaskCompletedEventId": "4",
81+
"retryPolicy": {
82+
"nonRetryableErrorTypes": [],
83+
"initialInterval": "1s",
84+
"backoffCoefficient": 2,
85+
"maximumInterval": "100s",
86+
"maximumAttempts": 0
87+
}
88+
}
89+
},
90+
{
91+
"eventId": "6",
92+
"eventTime": "2022-07-06T00:33:17.000Z",
93+
"eventType": "WorkflowExecutionSignaled",
94+
"version": "0",
95+
"taskId": "1057215",
96+
"workflowExecutionSignaledEventAttributes": {
97+
"signalName": "activityStarted",
98+
"input": { "payloads": [] },
99+
"identity": "61356@Roey-Bermans-MacBook-Pro.local"
100+
}
101+
},
102+
{
103+
"eventId": "7",
104+
"eventTime": "2022-07-06T00:33:17.000Z",
105+
"eventType": "WorkflowTaskScheduled",
106+
"version": "0",
107+
"taskId": "1057216",
108+
"workflowTaskScheduledEventAttributes": {
109+
"taskQueue": {
110+
"name": "61356@Roey-Bermans-MacBook-Pro.local-test-60cf35864bfd47c7ae2c6e5ce805118e",
111+
"kind": "Sticky"
112+
},
113+
"startToCloseTimeout": "10s",
114+
"attempt": 1
115+
}
116+
},
117+
{
118+
"eventId": "8",
119+
"eventTime": "2022-07-06T00:33:17.000Z",
120+
"eventType": "WorkflowTaskStarted",
121+
"version": "0",
122+
"taskId": "1057220",
123+
"workflowTaskStartedEventAttributes": {
124+
"scheduledEventId": "7",
125+
"identity": "61356@Roey-Bermans-MacBook-Pro.local",
126+
"requestId": "619e6c10-1e58-4d01-a28a-cc14d498311e"
127+
}
128+
},
129+
{
130+
"eventId": "9",
131+
"eventTime": "2022-07-06T00:33:17.000Z",
132+
"eventType": "WorkflowTaskCompleted",
133+
"version": "0",
134+
"taskId": "1057223",
135+
"workflowTaskCompletedEventAttributes": {
136+
"scheduledEventId": "7",
137+
"startedEventId": "8",
138+
"identity": "61356@Roey-Bermans-MacBook-Pro.local",
139+
"binaryChecksum": "@temporalio/worker@1.0.0-rc.0"
140+
}
141+
},
142+
{
143+
"eventId": "10",
144+
"eventTime": "2022-07-06T00:33:17.000Z",
145+
"eventType": "ActivityTaskCancelRequested",
146+
"version": "0",
147+
"taskId": "1057224",
148+
"activityTaskCancelRequestedEventAttributes": { "scheduledEventId": "5", "workflowTaskCompletedEventId": "9" }
149+
},
150+
{
151+
"eventId": "11",
152+
"eventTime": "2022-07-06T00:33:17.000Z",
153+
"eventType": "ActivityTaskStarted",
154+
"version": "0",
155+
"taskId": "1057226",
156+
"activityTaskStartedEventAttributes": {
157+
"scheduledEventId": "5",
158+
"identity": "61356@Roey-Bermans-MacBook-Pro.local",
159+
"requestId": "26731c3a-36dc-470e-ba2c-c5dcefd992a4",
160+
"attempt": 1
161+
}
162+
},
163+
{
164+
"eventId": "12",
165+
"eventTime": "2022-07-06T00:33:18.000Z",
166+
"eventType": "ActivityTaskCanceled",
167+
"version": "0",
168+
"taskId": "1057227",
169+
"activityTaskCanceledEventAttributes": {
170+
"latestCancelRequestedEventId": "10",
171+
"scheduledEventId": "5",
172+
"startedEventId": "11",
173+
"identity": "61356@Roey-Bermans-MacBook-Pro.local"
174+
}
175+
},
176+
{
177+
"eventId": "13",
178+
"eventTime": "2022-07-06T00:33:18.000Z",
179+
"eventType": "WorkflowTaskScheduled",
180+
"version": "0",
181+
"taskId": "1057228",
182+
"workflowTaskScheduledEventAttributes": {
183+
"taskQueue": {
184+
"name": "61356@Roey-Bermans-MacBook-Pro.local-test-60cf35864bfd47c7ae2c6e5ce805118e",
185+
"kind": "Sticky"
186+
},
187+
"startToCloseTimeout": "10s",
188+
"attempt": 1
189+
}
190+
},
191+
{
192+
"eventId": "14",
193+
"eventTime": "2022-07-06T00:33:18.000Z",
194+
"eventType": "WorkflowTaskStarted",
195+
"version": "0",
196+
"taskId": "1057232",
197+
"workflowTaskStartedEventAttributes": {
198+
"scheduledEventId": "13",
199+
"identity": "61356@Roey-Bermans-MacBook-Pro.local",
200+
"requestId": "6cca36d9-3d06-4b10-b279-67d6b8362787"
201+
}
202+
},
203+
{
204+
"eventId": "15",
205+
"eventTime": "2022-07-06T00:33:18.000Z",
206+
"eventType": "WorkflowTaskCompleted",
207+
"version": "0",
208+
"taskId": "1057235",
209+
"workflowTaskCompletedEventAttributes": {
210+
"scheduledEventId": "13",
211+
"startedEventId": "14",
212+
"identity": "61356@Roey-Bermans-MacBook-Pro.local",
213+
"binaryChecksum": "@temporalio/worker@1.0.0-rc.0"
214+
}
215+
},
216+
{
217+
"eventId": "16",
218+
"eventTime": "2022-07-06T00:33:18.000Z",
219+
"eventType": "WorkflowExecutionCompleted",
220+
"version": "0",
221+
"taskId": "1057236",
222+
"workflowExecutionCompletedEventAttributes": {
223+
"result": { "payloads": [{ "metadata": { "encoding": "YmluYXJ5L251bGw=" }, "data": "" }] },
224+
"workflowTaskCompletedEventId": "15",
225+
"newExecutionRunId": ""
226+
}
227+
}
228+
]
229+
}

packages/test/src/test-proto-utils.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import { historyFromJSON } from '@temporalio/common/lib/proto-utils';
2+
import proto from '@temporalio/proto';
3+
import test from 'ava';
4+
import * as fs from 'fs';
5+
import Long from 'long';
6+
import path from 'path';
7+
8+
const EventType = proto.temporal.api.enums.v1.EventType;
9+
const ContinueAsNewInitiator = proto.temporal.api.enums.v1.ContinueAsNewInitiator;
10+
const TaskQueueKind = proto.temporal.api.enums.v1.TaskQueueKind;
11+
12+
test('cancel-fake-progress-replay', async (t) => {
13+
const histJSON = JSON.parse(
14+
await fs.promises.readFile(path.resolve(__dirname, '../history_files/cancel_fake_progress_history.json'), 'utf8')
15+
);
16+
const hist = historyFromJSON(histJSON);
17+
t.deepEqual(hist.events?.[15].eventId, new Long(16));
18+
t.is(hist.events?.[15].eventType, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED);
19+
t.is(
20+
hist.events?.[0].workflowExecutionStartedEventAttributes?.initiator,
21+
ContinueAsNewInitiator.CONTINUE_AS_NEW_INITIATOR_UNSPECIFIED
22+
);
23+
t.is(hist.events?.[0].workflowExecutionStartedEventAttributes?.taskQueue?.kind, TaskQueueKind.TASK_QUEUE_KIND_NORMAL);
24+
});

packages/test/src/test-replay.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,21 @@ test('cancel-fake-progress-replay', async (t) => {
3030
await Worker.runReplayHistory(
3131
{
3232
workflowsPath: require.resolve('./workflows'),
33-
replayName: t.title,
33+
},
34+
hist
35+
);
36+
t.pass();
37+
});
38+
39+
test('cancel-fake-progress-replay from JSON', async (t) => {
40+
const histJson = await fs.promises.readFile(
41+
path.resolve(__dirname, '../history_files/cancel_fake_progress_history.json'),
42+
'utf8'
43+
);
44+
const hist = JSON.parse(histJson);
45+
await Worker.runReplayHistory(
46+
{
47+
workflowsPath: require.resolve('./workflows'),
3448
},
3549
hist
3650
);
@@ -51,7 +65,6 @@ test('cancel-fake-progress-replay-nondeterministic', async (t) => {
5165
Worker.runReplayHistory(
5266
{
5367
workflowsPath: require.resolve('./workflows'),
54-
replayName: t.title,
5568
},
5669
hist
5770
),

0 commit comments

Comments
 (0)