Skip to content

Commit e91df70

Browse files
authored
fix(workflow): Workflow Activation Encoder was discarding SDK flags (#1530)
1 parent 01ee142 commit e91df70

13 files changed

+989
-199
lines changed

packages/common/src/proto-utils.ts

Lines changed: 97 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -5,85 +5,99 @@ import { patchProtobufRoot } from '@temporalio/proto/lib/patch-protobuf-root';
55
export type History = proto.temporal.api.history.v1.IHistory;
66
export type Payload = proto.temporal.api.common.v1.IPayload;
77

8+
/**
9+
* JSON representation of Temporal's {@link Payload} protobuf object
10+
*/
11+
export interface JSONPayload {
12+
/**
13+
* Mapping of key to base64 encoded value
14+
*/
15+
metadata?: Record<string, string> | null;
16+
/**
17+
* base64 encoded value
18+
*/
19+
data?: string | null;
20+
}
21+
822
// Cast to any because the generated proto module types are missing the lookupType method
923
const patched = patchProtobufRoot(proto) as any;
1024
const historyType = patched.lookupType('temporal.api.history.v1.History');
1125
const payloadType = patched.lookupType('temporal.api.common.v1.Payload');
1226

13-
function pascalCaseToConstantCase(s: string) {
14-
return s.replace(/[^\b][A-Z]/g, (m) => `${m[0]}_${m[1]}`).toUpperCase();
15-
}
16-
17-
function fixEnumValue<O extends Record<string, any>>(obj: O, attr: keyof O, prefix: string) {
18-
return (
19-
obj[attr] && {
20-
[attr]: obj[attr].startsWith(prefix) ? obj[attr] : `${prefix}_${pascalCaseToConstantCase(obj[attr])}`,
21-
}
22-
);
23-
}
27+
/**
28+
* Convert a proto JSON representation of History to a valid History object
29+
*/
30+
export function historyFromJSON(history: unknown): History {
31+
function pascalCaseToConstantCase(s: string) {
32+
return s.replace(/[^\b][A-Z]/g, (m) => `${m[0]}_${m[1]}`).toUpperCase();
33+
}
2434

25-
// fromProto3JSON doesn't allow null values on 'bytes' fields. This turns out to be a problem for payloads.
26-
// Recursively descend on objects and array, and fix in-place any payload that has a null data field
27-
function fixPayloads<T>(e: T): T {
28-
function isPayload(p: any): p is JSONPayload {
29-
return p && typeof p === 'object' && 'metadata' in p && 'data' in p;
35+
function fixEnumValue<O extends Record<string, any>>(obj: O, attr: keyof O, prefix: string) {
36+
return (
37+
obj[attr] && {
38+
[attr]: obj[attr].startsWith(prefix) ? obj[attr] : `${prefix}_${pascalCaseToConstantCase(obj[attr])}`,
39+
}
40+
);
3041
}
3142

32-
if (e && typeof e === 'object') {
33-
if (isPayload(e)) {
34-
if (e.data === null) {
35-
const { data: _data, ...rest } = e;
36-
return rest as T;
43+
// fromProto3JSON doesn't allow null values on 'bytes' fields. This turns out to be a problem for payloads.
44+
// Recursively descend on objects and array, and fix in-place any payload that has a null data field
45+
function fixPayloads<T>(e: T): T {
46+
function isPayload(p: any): p is JSONPayload {
47+
return p && typeof p === 'object' && 'metadata' in p && 'data' in p;
48+
}
49+
50+
if (e && typeof e === 'object') {
51+
if (isPayload(e)) {
52+
if (e.data === null) {
53+
const { data: _data, ...rest } = e;
54+
return rest as T;
55+
}
56+
return e;
3757
}
38-
return e;
58+
if (Array.isArray(e)) return e.map(fixPayloads) as T;
59+
return Object.fromEntries(Object.entries(e as object).map(([k, v]) => [k, fixPayloads(v)])) as T;
3960
}
40-
if (Array.isArray(e)) return e.map(fixPayloads) as T;
41-
return Object.fromEntries(Object.entries(e as object).map(([k, v]) => [k, fixPayloads(v)])) as T;
61+
return e;
4262
}
43-
return e;
44-
}
4563

46-
function fixHistoryEvent(e: Record<string, any>) {
47-
const type = Object.keys(e).find((k) => k.endsWith('EventAttributes'));
48-
if (!type) {
49-
throw new TypeError(`Missing attributes in history event: ${JSON.stringify(e)}`);
50-
}
64+
function fixHistoryEvent(e: Record<string, any>) {
65+
const type = Object.keys(e).find((k) => k.endsWith('EventAttributes'));
66+
if (!type) {
67+
throw new TypeError(`Missing attributes in history event: ${JSON.stringify(e)}`);
68+
}
5169

52-
// Fix payloads with null data
53-
e = fixPayloads(e);
54-
55-
return {
56-
...e,
57-
...fixEnumValue(e, 'eventType', 'EVENT_TYPE'),
58-
[type]: {
59-
...e[type],
60-
...(e[type].taskQueue && {
61-
taskQueue: { ...e[type].taskQueue, ...fixEnumValue(e[type].taskQueue, 'kind', 'TASK_QUEUE_KIND') },
62-
}),
63-
...fixEnumValue(e[type], 'parentClosePolicy', 'PARENT_CLOSE_POLICY'),
64-
...fixEnumValue(e[type], 'workflowIdReusePolicy', 'WORKFLOW_ID_REUSE_POLICY'),
65-
...fixEnumValue(e[type], 'initiator', 'CONTINUE_AS_NEW_INITIATOR'),
66-
...fixEnumValue(e[type], 'retryState', 'RETRY_STATE'),
67-
...(e[type].childWorkflowExecutionFailureInfo && {
68-
childWorkflowExecutionFailureInfo: {
69-
...e[type].childWorkflowExecutionFailureInfo,
70-
...fixEnumValue(e[type].childWorkflowExecutionFailureInfo, 'retryState', 'RETRY_STATE'),
71-
},
72-
}),
73-
},
74-
};
75-
}
70+
// Fix payloads with null data
71+
e = fixPayloads(e);
7672

77-
function fixHistory(h: Record<string, any>) {
78-
return {
79-
events: h.events.map(fixHistoryEvent),
80-
};
81-
}
73+
return {
74+
...e,
75+
...fixEnumValue(e, 'eventType', 'EVENT_TYPE'),
76+
[type]: {
77+
...e[type],
78+
...(e[type].taskQueue && {
79+
taskQueue: { ...e[type].taskQueue, ...fixEnumValue(e[type].taskQueue, 'kind', 'TASK_QUEUE_KIND') },
80+
}),
81+
...fixEnumValue(e[type], 'parentClosePolicy', 'PARENT_CLOSE_POLICY'),
82+
...fixEnumValue(e[type], 'workflowIdReusePolicy', 'WORKFLOW_ID_REUSE_POLICY'),
83+
...fixEnumValue(e[type], 'initiator', 'CONTINUE_AS_NEW_INITIATOR'),
84+
...fixEnumValue(e[type], 'retryState', 'RETRY_STATE'),
85+
...(e[type].childWorkflowExecutionFailureInfo && {
86+
childWorkflowExecutionFailureInfo: {
87+
...e[type].childWorkflowExecutionFailureInfo,
88+
...fixEnumValue(e[type].childWorkflowExecutionFailureInfo, 'retryState', 'RETRY_STATE'),
89+
},
90+
}),
91+
},
92+
};
93+
}
94+
95+
function fixHistory(h: Record<string, any>) {
96+
return {
97+
events: h.events.map(fixHistoryEvent),
98+
};
99+
}
82100

83-
/**
84-
* Convert a proto JSON representation of History to a valid History object
85-
*/
86-
export function historyFromJSON(history: unknown): History {
87101
if (typeof history !== 'object' || history == null || !Array.isArray((history as any).events)) {
88102
throw new TypeError('Invalid history, expected an object with an array of events');
89103
}
@@ -95,17 +109,26 @@ export function historyFromJSON(history: unknown): History {
95109
}
96110

97111
/**
98-
* JSON representation of Temporal's {@link Payload} protobuf object
112+
* Convert an History object, e.g. as returned by `WorkflowClient.list().withHistory()`, to a JSON
113+
* string that adheres to the same norm as JSON history files produced by other Temporal tools.
99114
*/
100-
export interface JSONPayload {
101-
/**
102-
* Mapping of key to base64 encoded value
103-
*/
104-
metadata?: Record<string, string> | null;
105-
/**
106-
* base64 encoded value
107-
*/
108-
data?: string | null;
115+
export function historyToJSON(history: History): string {
116+
// toProto3JSON doesn't correctly handle some of our "bytes" fields, passing them untouched to the
117+
// output, after which JSON.stringify() would convert them to an array of numbers. As a workaround,
118+
// recursively walk the object and convert all Buffer instances to base64 strings. Note this only
119+
// works on proto3-json-serializer v2.0.0. v2.0.2 throws an error before we even get the chance
120+
// to fix the buffers. See https://github.com/googleapis/proto3-json-serializer-nodejs/issues/103.
121+
function fixBuffers<T>(e: T): T {
122+
if (e && typeof e === 'object') {
123+
if (e instanceof Buffer) return e.toString('base64') as any;
124+
if (Array.isArray(e)) return e.map(fixBuffers) as T;
125+
return Object.fromEntries(Object.entries(e as object).map(([k, v]) => [k, fixBuffers(v)])) as T;
126+
}
127+
return e;
128+
}
129+
130+
const protoJson = toProto3JSON(proto.temporal.api.history.v1.History.fromObject(history) as any);
131+
return JSON.stringify(fixBuffers(protoJson), null, 2);
109132
}
110133

111134
/**
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
{
2+
"events": [
3+
{
4+
"eventId": "1",
5+
"eventTime": "2024-09-23T18:06:34.810426Z",
6+
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
7+
"taskId": "1048604",
8+
"workflowExecutionStartedEventAttributes": {
9+
"workflowType": { "name": "langFlagsReplayCorrectly" },
10+
"taskQueue": { "name": "Lang's_SDK_flags_replay_correctly", "kind": "TASK_QUEUE_KIND_NORMAL" },
11+
"input": {},
12+
"workflowTaskTimeout": "10s",
13+
"originalExecutionRunId": "6cb24f2a-74ed-4a8e-9db0-8a0e3dfebaef",
14+
"identity": "12645@JamesMBTemporal",
15+
"firstExecutionRunId": "6cb24f2a-74ed-4a8e-9db0-8a0e3dfebaef",
16+
"attempt": 1,
17+
"firstWorkflowTaskBackoff": "0s",
18+
"header": { "fields": {} },
19+
"workflowId": "4d57b445-d3e7-4e20-bef9-36193a29c380"
20+
}
21+
},
22+
{
23+
"eventId": "2",
24+
"eventTime": "2024-09-23T18:06:34.810489Z",
25+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
26+
"taskId": "1048605",
27+
"workflowTaskScheduledEventAttributes": {
28+
"taskQueue": { "name": "Lang's_SDK_flags_replay_correctly", "kind": "TASK_QUEUE_KIND_NORMAL" },
29+
"startToCloseTimeout": "10s",
30+
"attempt": 1
31+
}
32+
},
33+
{
34+
"eventId": "3",
35+
"eventTime": "2024-09-23T18:06:34.816506Z",
36+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
37+
"taskId": "1048610",
38+
"workflowTaskStartedEventAttributes": {
39+
"scheduledEventId": "2",
40+
"identity": "12645@JamesMBTemporal",
41+
"requestId": "7db844e1-00d2-4536-b37c-4eaa8abc9d58",
42+
"historySizeBytes": "320",
43+
"workerVersion": {
44+
"buildId": "@temporalio/worker@1.11.1+16db4f650868873e61d91ca2f859548358ffd2edd8cb7798c050afc152aca4d3"
45+
}
46+
}
47+
},
48+
{
49+
"eventId": "4",
50+
"eventTime": "2024-09-23T18:06:34.869609Z",
51+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
52+
"taskId": "1048615",
53+
"workflowTaskCompletedEventAttributes": {
54+
"scheduledEventId": "2",
55+
"startedEventId": "3",
56+
"identity": "12645@JamesMBTemporal",
57+
"workerVersion": {
58+
"buildId": "@temporalio/worker@1.11.1+16db4f650868873e61d91ca2f859548358ffd2edd8cb7798c050afc152aca4d3"
59+
},
60+
"sdkMetadata": { "coreUsedFlags": [3, 1, 2] },
61+
"meteringMetadata": {}
62+
}
63+
},
64+
{
65+
"eventId": "5",
66+
"eventTime": "2024-09-23T18:06:34.869660Z",
67+
"eventType": "EVENT_TYPE_TIMER_STARTED",
68+
"taskId": "1048616",
69+
"timerStartedEventAttributes": {
70+
"timerId": "1",
71+
"startToFireTimeout": "10s",
72+
"workflowTaskCompletedEventId": "4"
73+
}
74+
},
75+
{
76+
"eventId": "6",
77+
"eventTime": "2024-09-23T18:06:34.869701Z",
78+
"eventType": "EVENT_TYPE_ACTIVITY_TASK_SCHEDULED",
79+
"taskId": "1048617",
80+
"activityTaskScheduledEventAttributes": {
81+
"activityId": "1",
82+
"activityType": { "name": "noopActivity" },
83+
"taskQueue": { "name": "Lang's_SDK_flags_replay_correctly", "kind": "TASK_QUEUE_KIND_NORMAL" },
84+
"header": { "fields": {} },
85+
"scheduleToCloseTimeout": "10s",
86+
"scheduleToStartTimeout": "10s",
87+
"startToCloseTimeout": "10s",
88+
"heartbeatTimeout": "0s",
89+
"workflowTaskCompletedEventId": "4",
90+
"retryPolicy": { "initialInterval": "1s", "backoffCoefficient": 2, "maximumInterval": "100s" },
91+
"useWorkflowBuildId": true
92+
}
93+
},
94+
{
95+
"eventId": "7",
96+
"eventTime": "2024-09-23T18:06:34.869850Z",
97+
"eventType": "EVENT_TYPE_ACTIVITY_TASK_STARTED",
98+
"taskId": "1048622",
99+
"activityTaskStartedEventAttributes": {
100+
"scheduledEventId": "6",
101+
"identity": "12645@JamesMBTemporal",
102+
"requestId": "db9e723d-202c-4d75-ae5f-06a8e5489428",
103+
"attempt": 1,
104+
"workerVersion": {
105+
"buildId": "@temporalio/worker@1.11.1+16db4f650868873e61d91ca2f859548358ffd2edd8cb7798c050afc152aca4d3"
106+
}
107+
}
108+
},
109+
{
110+
"eventId": "8",
111+
"eventTime": "2024-09-23T18:06:34.878511Z",
112+
"eventType": "EVENT_TYPE_ACTIVITY_TASK_COMPLETED",
113+
"taskId": "1048623",
114+
"activityTaskCompletedEventAttributes": {
115+
"result": {
116+
"payloads": [{ "metadata": { "encoding": "YmluYXJ5L251bGw=" }, "data": "" }]
117+
},
118+
"scheduledEventId": "6",
119+
"startedEventId": "7",
120+
"identity": "12645@JamesMBTemporal"
121+
}
122+
},
123+
{
124+
"eventId": "9",
125+
"eventTime": "2024-09-23T18:06:34.878515Z",
126+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
127+
"taskId": "1048624",
128+
"workflowTaskScheduledEventAttributes": {
129+
"taskQueue": {
130+
"name": "12645@JamesMBTemporal-d05d6c11960c40e792c37014aed3ec23",
131+
"kind": "TASK_QUEUE_KIND_STICKY",
132+
"normalName": "Lang's_SDK_flags_replay_correctly"
133+
},
134+
"startToCloseTimeout": "10s",
135+
"attempt": 1
136+
}
137+
},
138+
{
139+
"eventId": "10",
140+
"eventTime": "2024-09-23T18:06:34.879463Z",
141+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
142+
"taskId": "1048628",
143+
"workflowTaskStartedEventAttributes": {
144+
"scheduledEventId": "9",
145+
"identity": "12645@JamesMBTemporal",
146+
"requestId": "805b6064-313b-4a4d-a4fd-d29ec4a1afea",
147+
"historySizeBytes": "1223",
148+
"workerVersion": {
149+
"buildId": "@temporalio/worker@1.11.1+16db4f650868873e61d91ca2f859548358ffd2edd8cb7798c050afc152aca4d3"
150+
}
151+
}
152+
},
153+
{
154+
"eventId": "11",
155+
"eventTime": "2024-09-23T18:06:34.889077Z",
156+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
157+
"taskId": "1048632",
158+
"workflowTaskCompletedEventAttributes": {
159+
"scheduledEventId": "9",
160+
"startedEventId": "10",
161+
"identity": "12645@JamesMBTemporal",
162+
"workerVersion": {
163+
"buildId": "@temporalio/worker@1.11.1+16db4f650868873e61d91ca2f859548358ffd2edd8cb7798c050afc152aca4d3"
164+
},
165+
"sdkMetadata": {},
166+
"meteringMetadata": {}
167+
}
168+
},
169+
{
170+
"eventId": "12",
171+
"eventTime": "2024-09-23T18:06:34.889101Z",
172+
"eventType": "EVENT_TYPE_TIMER_CANCELED",
173+
"taskId": "1048633",
174+
"timerCanceledEventAttributes": {
175+
"timerId": "1",
176+
"startedEventId": "5",
177+
"workflowTaskCompletedEventId": "11",
178+
"identity": "12645@JamesMBTemporal"
179+
}
180+
},
181+
{
182+
"eventId": "13",
183+
"eventTime": "2024-09-23T18:06:34.889124Z",
184+
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED",
185+
"taskId": "1048634",
186+
"workflowExecutionCompletedEventAttributes": {
187+
"result": {
188+
"payloads": [{ "metadata": { "encoding": "YmluYXJ5L251bGwK" }, "data": "" }]
189+
},
190+
"workflowTaskCompletedEventId": "11"
191+
}
192+
}
193+
]
194+
}

0 commit comments

Comments
 (0)