Skip to content

Commit d6e2738

Browse files
authored
fix(workflow): process all activation jobs as a single batch (#1488)
1 parent 56207d8 commit d6e2738

22 files changed

+1245
-261
lines changed

docs/activation-in-debug-mode.mermaid

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,29 @@ sequenceDiagram
1212
end
1313
Core->>-MT: Respond with Activation
1414
MT->>MT: Decode Payloads
15-
loop patches, signals, updates, completions, queries as jobs
16-
MT->>VM: Activate(jobs)
15+
MT->>+WT: Run Workflow Activation
16+
17+
WT->>VM: Update Activator (now, WorkflowInfo, SDK flags, patches)
18+
19+
alt "Single Batch mode"
20+
WT->>VM: Activate(queries)
1721
VM->>VM: Run Microtasks
18-
MT->>VM: Try Unblock Conditions
22+
WT->>VM: Try Unblock Conditions
23+
else Legacy "Multi Batches mode"
24+
loop [signals, updates+completions] as jobs
25+
WT->>VM: Activate(jobs)
26+
VM->>VM: Run Microtasks
27+
WT->>VM: Try Unblock Conditions
28+
end
1929
end
30+
2031
MT->>VM: Collect Commands
2132
MT->>MT: Encode Payloads
2233
MT->>+VM: Collect Sink Calls
2334
VM-->>-MT: Respond with Sink Calls
2435
MT->>MT: Run Sink Functions
2536
MT->>Core: Complete Activation
26-
opt Completed Workflow Task
37+
opt Completed Workflow Task
2738
Core->>Server: Complete Workflow Task
2839
end
2940

docs/activation.mermaid

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,21 @@ sequenceDiagram
1414
Core->>-MT: Respond with Activation
1515
MT->>MT: Decode Payloads
1616
MT->>+WT: Run Workflow Activation
17-
loop patches, signals, updates, completions, queries as jobs
18-
WT->>VM: Activate(jobs)
17+
18+
WT->>VM: Update Activator (now, WorkflowInfo, SDK flags, patches)
19+
20+
alt "Single Batch mode"
21+
WT->>VM: Activate(queries)
1922
VM->>VM: Run Microtasks
2023
WT->>VM: Try Unblock Conditions
24+
else Legacy "Multi Batches mode"
25+
loop [signals, updates+completions] as jobs
26+
WT->>VM: Activate(jobs)
27+
VM->>VM: Run Microtasks
28+
WT->>VM: Try Unblock Conditions
29+
end
2130
end
31+
2232
WT->>VM: Collect Commands
2333
WT-->>-MT: Respond to Activation
2434
MT->>MT: Encode Payloads
@@ -28,6 +38,6 @@ sequenceDiagram
2838
WT-->>-MT: Respond with Sink Calls
2939
MT->>MT: Run Sink Functions
3040
MT->>Core: Complete Activation
31-
opt Completed Workflow Task
41+
opt Completed Workflow Task
3242
Core->>Server: Complete Workflow Task
3343
end
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
{
2+
"events": [
3+
{
4+
"eventId": "1",
5+
"eventTime": "2024-08-14T03:50:59.998228Z",
6+
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
7+
"taskId": "1048642",
8+
"workflowExecutionStartedEventAttributes": {
9+
"workflowType": {
10+
"name": "canCompleteUpdateAfterWorkflowReturns"
11+
},
12+
"taskQueue": {
13+
"name": "test",
14+
"kind": "TASK_QUEUE_KIND_NORMAL"
15+
},
16+
"workflowExecutionTimeout": "0s",
17+
"workflowRunTimeout": "0s",
18+
"workflowTaskTimeout": "10s",
19+
"originalExecutionRunId": "b9d2c3ad-e03e-49e4-857e-1939d9d32f5e",
20+
"identity": "temporal-cli:jwatkins@JamesMBTemporal",
21+
"firstExecutionRunId": "b9d2c3ad-e03e-49e4-857e-1939d9d32f5e",
22+
"attempt": 1,
23+
"firstWorkflowTaskBackoff": "0s",
24+
"header": {},
25+
"workflowId": "eb5f6727-7fb3-4f48-aba2-1bd7d46823a1"
26+
}
27+
},
28+
{
29+
"eventId": "2",
30+
"eventTime": "2024-08-14T03:50:59.998393Z",
31+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
32+
"taskId": "1048643",
33+
"workflowTaskScheduledEventAttributes": {
34+
"taskQueue": {
35+
"name": "test",
36+
"kind": "TASK_QUEUE_KIND_NORMAL"
37+
},
38+
"startToCloseTimeout": "10s",
39+
"attempt": 1
40+
}
41+
},
42+
{
43+
"eventId": "3",
44+
"eventTime": "2024-08-14T03:51:24.737259Z",
45+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
46+
"taskId": "1048648",
47+
"workflowTaskStartedEventAttributes": {
48+
"scheduledEventId": "2",
49+
"identity": "13971@JamesMBTemporal",
50+
"requestId": "f8a583b6-d423-45b7-a34d-b3c8e822d10f",
51+
"historySizeBytes": "293",
52+
"workerVersion": {
53+
"buildId": "@temporalio/worker@1.10.1+8983e4c58e21c0f316606d45c034d286695e7f31b7693b88a8ca3c102fce506c"
54+
}
55+
}
56+
},
57+
{
58+
"eventId": "4",
59+
"eventTime": "2024-08-14T03:51:24.779886Z",
60+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
61+
"taskId": "1048652",
62+
"workflowTaskCompletedEventAttributes": {
63+
"scheduledEventId": "2",
64+
"startedEventId": "3",
65+
"identity": "13971@JamesMBTemporal",
66+
"workerVersion": {
67+
"buildId": "@temporalio/worker@1.10.1+8983e4c58e21c0f316606d45c034d286695e7f31b7693b88a8ca3c102fce506c"
68+
},
69+
"sdkMetadata": {
70+
"coreUsedFlags": [2, 1]
71+
},
72+
"meteringMetadata": {}
73+
}
74+
},
75+
{
76+
"eventId": "5",
77+
"eventTime": "2024-08-14T03:51:24.779952Z",
78+
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED",
79+
"taskId": "1048653",
80+
"workflowExecutionUpdateAcceptedEventAttributes": {
81+
"protocolInstanceId": "fb28b772-4538-45a4-99f0-550fae0b7668",
82+
"acceptedRequestMessageId": "fb28b772-4538-45a4-99f0-550fae0b7668/request",
83+
"acceptedRequestSequencingEventId": "2",
84+
"acceptedRequest": {
85+
"meta": {
86+
"updateId": "fb28b772-4538-45a4-99f0-550fae0b7668",
87+
"identity": "temporal-cli:jwatkins@JamesMBTemporal"
88+
},
89+
"input": {
90+
"header": {},
91+
"name": "doneUpdate"
92+
}
93+
}
94+
}
95+
},
96+
{
97+
"eventId": "6",
98+
"eventTime": "2024-08-14T03:51:24.779982Z",
99+
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED",
100+
"taskId": "1048654",
101+
"workflowExecutionCompletedEventAttributes": {
102+
"result": {
103+
"payloads": [
104+
{
105+
"metadata": {
106+
"encoding": "YmluYXJ5L251bGw="
107+
}
108+
}
109+
]
110+
},
111+
"workflowTaskCompletedEventId": "4"
112+
}
113+
}
114+
]
115+
}

packages/test/src/helpers-integration.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ import {
1616
DefaultLogger,
1717
LogEntry,
1818
LogLevel,
19+
ReplayWorkerOptions,
1920
Runtime,
2021
WorkerOptions,
2122
WorkflowBundle,
2223
bundleWorkflowCode,
2324
makeTelemetryFilterString,
2425
} from '@temporalio/worker';
2526
import * as workflow from '@temporalio/workflow';
27+
import { temporal } from '@temporalio/proto';
2628
import { ConnectionInjectorInterceptor } from './activities/interceptors';
2729
import {
2830
Worker,
@@ -105,6 +107,7 @@ export function makeTestFunction(opts: {
105107
export interface Helpers {
106108
taskQueue: string;
107109
createWorker(opts?: Partial<WorkerOptions>): Promise<Worker>;
110+
runReplayHistory(opts: Partial<ReplayWorkerOptions>, history: temporal.api.history.v1.IHistory): Promise<void>;
108111
executeWorkflow<T extends () => Promise<any>>(workflowType: T): Promise<workflow.WorkflowResultType<T>>;
109112
executeWorkflow<T extends workflow.Workflow>(
110113
fn: T,
@@ -137,6 +140,18 @@ export function helpers(t: ExecutionContext<Context>, testEnv: TestWorkflowEnvir
137140
...opts,
138141
});
139142
},
143+
async runReplayHistory(
144+
opts: Partial<ReplayWorkerOptions>,
145+
history: temporal.api.history.v1.IHistory
146+
): Promise<void> {
147+
await Worker.runReplayHistory(
148+
{
149+
workflowBundle: t.context.workflowBundle,
150+
...opts,
151+
},
152+
history
153+
);
154+
},
140155
async executeWorkflow(
141156
fn: workflow.Workflow,
142157
opts?: Omit<WorkflowStartOptions, 'taskQueue' | 'workflowId'>

packages/test/src/helpers.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import * as fs from 'fs/promises';
12
import * as net from 'net';
23
import path from 'path';
34
import StackUtils from 'stack-utils';
@@ -128,6 +129,7 @@ export const bundlerOptions = {
128129
'async-retry',
129130
'uuid',
130131
'net',
132+
'fs/promises',
131133
],
132134
};
133135

@@ -293,3 +295,15 @@ export function asSdkLoggerSink(
293295
},
294296
};
295297
}
298+
299+
export async function getHistories(fname: string): Promise<iface.temporal.api.history.v1.History> {
300+
const isJson = fname.endsWith('json');
301+
const fpath = path.resolve(__dirname, `../history_files/${fname}`);
302+
if (isJson) {
303+
const hist = await fs.readFile(fpath, 'utf8');
304+
return JSON.parse(hist);
305+
} else {
306+
const hist = await fs.readFile(fpath);
307+
return iface.temporal.api.history.v1.History.decode(hist);
308+
}
309+
}

0 commit comments

Comments
 (0)