Skip to content

Commit 737eb92

Browse files
authored
feat(worker): Add support for concurrent pollers and more (#1132)
1 parent 993115d commit 737eb92

File tree

19 files changed

+369
-119
lines changed

19 files changed

+369
-119
lines changed

package-lock.json

Lines changed: 2 additions & 32 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/core-bridge/src/conversions.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,10 @@ impl ObjectHandleConversionsExt for Handle<'_, JsObject> {
294294
js_value_getter!(cx, self, "maxConcurrentLocalActivityExecutions", JsNumber) as usize;
295295
let max_outstanding_workflow_tasks =
296296
js_value_getter!(cx, self, "maxConcurrentWorkflowTaskExecutions", JsNumber) as usize;
297+
let max_concurrent_wft_polls =
298+
js_value_getter!(cx, self, "maxConcurrentWorkflowTaskPolls", JsNumber) as usize;
299+
let max_concurrent_at_polls =
300+
js_value_getter!(cx, self, "maxConcurrentActivityTaskPolls", JsNumber) as usize;
297301
let sticky_queue_schedule_to_start_timeout = Duration::from_millis(js_value_getter!(
298302
cx,
299303
self,
@@ -335,6 +339,8 @@ impl ObjectHandleConversionsExt for Handle<'_, JsObject> {
335339
.max_outstanding_workflow_tasks(max_outstanding_workflow_tasks)
336340
.max_outstanding_activities(max_outstanding_activities)
337341
.max_outstanding_local_activities(max_outstanding_local_activities)
342+
.max_concurrent_wft_polls(max_concurrent_wft_polls)
343+
.max_concurrent_at_polls(max_concurrent_at_polls)
338344
.max_cached_workflows(max_cached_workflows)
339345
.sticky_queue_schedule_to_start_timeout(sticky_queue_schedule_to_start_timeout)
340346
.graceful_shutdown_period(graceful_shutdown_period)

packages/core-bridge/src/runtime.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,9 @@ pub fn start_bridge_loop(
120120
) {
121121
let mut tokio_builder = tokio::runtime::Builder::new_multi_thread();
122122
tokio_builder.enable_all().thread_name("core");
123-
let core_runtime =
124-
Arc::new(CoreRuntime::new(telemetry_options, tokio_builder).expect("Failed to create CoreRuntime"));
123+
let core_runtime = Arc::new(
124+
CoreRuntime::new(telemetry_options, tokio_builder).expect("Failed to create CoreRuntime"),
125+
);
125126

126127
core_runtime.tokio_handle().block_on(async {
127128
loop {

packages/core-bridge/ts/index.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,16 @@ export interface WorkerOptions {
277277
maxConcurrentWorkflowTaskExecutions: number;
278278
maxConcurrentLocalActivityExecutions: number;
279279

280+
/**
281+
* Maximum number of Workflow tasks to poll concurrently.
282+
*/
283+
maxConcurrentWorkflowTaskPolls: number;
284+
285+
/**
286+
* Maximum number of Activity tasks to poll concurrently.
287+
*/
288+
maxConcurrentActivityTaskPolls: number;
289+
280290
/**
281291
* If set to `false` this worker will only handle workflow tasks and local activities, it will not
282292
* poll for activity tasks.

packages/test/src/load/args.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,15 @@ export type Spec = Record<string, () => any>;
1111
export interface SetupArgSpec extends Spec {
1212
'--ns': typeof String;
1313
'--server-address': typeof String;
14+
'--client-cert-path': typeof String;
15+
'--client-key-path': typeof String;
1416
}
1517

1618
export const setupArgSpec: SetupArgSpec = {
1719
'--ns': String,
1820
'--server-address': String,
21+
'--client-cert-path': String,
22+
'--client-key-path': String,
1923
};
2024

2125
export interface StarterArgSpec extends Spec {
@@ -33,6 +37,8 @@ export interface StarterArgSpec extends Spec {
3337
'--do-query': typeof String;
3438
'--initial-query-delay-ms': typeof Number;
3539
'--query-interval-ms': typeof Number;
40+
'--client-cert-path': typeof String;
41+
'--client-key-path': typeof String;
3642
}
3743

3844
export const starterArgSpec: StarterArgSpec = {
@@ -50,6 +56,8 @@ export const starterArgSpec: StarterArgSpec = {
5056
'--do-query': String,
5157
'--initial-query-delay-ms': Number,
5258
'--query-interval-ms': Number,
59+
'--client-cert-path': String,
60+
'--client-key-path': String,
5361
};
5462

5563
export interface WorkerArgSpec extends Spec {
@@ -59,12 +67,17 @@ export interface WorkerArgSpec extends Spec {
5967
'--max-concurrent-at-executions': typeof Number;
6068
'--max-concurrent-wft-executions': typeof Number;
6169
'--max-concurrent-la-executions': typeof Number;
70+
'--max-wft-pollers': typeof Number;
71+
'--max-at-pollers': typeof Number;
72+
'--wf-thread-pool-size': typeof Number;
6273
'--log-level': typeof String;
6374
'--log-file': typeof String;
6475
'--server-address': typeof String;
6576
'--otel-url': typeof String;
6677
'--status-port': typeof Number;
6778
'--shutdown-grace-time-ms': typeof String;
79+
'--client-cert-path': typeof String;
80+
'--client-key-path': typeof String;
6881
}
6982

7083
export const workerArgSpec: WorkerArgSpec = {
@@ -74,13 +87,17 @@ export const workerArgSpec: WorkerArgSpec = {
7487
'--max-concurrent-at-executions': Number,
7588
'--max-concurrent-wft-executions': Number,
7689
'--max-concurrent-la-executions': Number,
77-
'--isolate-pool-size': Number,
90+
'--max-wft-pollers': Number,
91+
'--max-at-pollers': Number,
92+
'--wf-thread-pool-size': Number,
7893
'--log-level': String,
7994
'--log-file': String,
8095
'--server-address': String,
8196
'--otel-url': String,
8297
'--status-port': Number,
8398
'--shutdown-grace-time-ms': String,
99+
'--client-cert-path': String,
100+
'--client-key-path': String,
84101
};
85102

86103
export interface WrapperArgSpec extends Spec {

packages/test/src/load/setup.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { readFileSync } from 'fs';
12
import arg from 'arg';
23
import { Connection } from '@temporalio/client';
34
import { createNamespace, waitOnNamespace } from '@temporalio/testing/lib/utils';
@@ -8,8 +9,31 @@ async function main() {
89

910
const serverAddress = getRequired(args, '--server-address');
1011
const namespace = getRequired(args, '--ns');
12+
const clientCertPath = args['--client-cert-path'];
13+
const clientKeyPath = args['--client-key-path'];
1114

12-
const connection = await Connection.connect({ address: serverAddress });
15+
const tlsConfig =
16+
clientCertPath && clientKeyPath
17+
? {
18+
tls: {
19+
clientCertPair: {
20+
crt: readFileSync(clientCertPath),
21+
key: readFileSync(clientKeyPath),
22+
},
23+
},
24+
}
25+
: {};
26+
27+
const connection = await Connection.connect({ address: serverAddress, ...tlsConfig });
28+
29+
console.log('Checking if namespace already exists', { namespace });
30+
try {
31+
await waitOnNamespace(connection, namespace, 1, 0);
32+
// Namespace already exists. Nothing to do
33+
return;
34+
} catch (e) {
35+
// Ignore error. Will create namespace if it does not exist
36+
}
1337

1438
await createNamespace(connection, namespace);
1539
console.log('Registered namespace', { namespace });

packages/test/src/load/starter.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import os from 'node:os';
2-
import fs from 'node:fs';
2+
import fs, { readFileSync } from 'node:fs';
33
import arg from 'arg';
44
import pidusage from 'pidusage';
55
import * as grpc from '@grpc/grpc-js';
@@ -201,7 +201,22 @@ async function main() {
201201
const workerMemoryLogFile = args['--worker-memory-log-file'];
202202
const workerCPULogFile = args['--worker-cpu-log-file'];
203203

204-
const connection = await Connection.connect({ address: serverAddress });
204+
const clientCertPath = args['--client-cert-path'];
205+
const clientKeyPath = args['--client-key-path'];
206+
207+
const tlsConfig =
208+
clientCertPath && clientKeyPath
209+
? {
210+
tls: {
211+
clientCertPair: {
212+
crt: readFileSync(clientCertPath),
213+
key: readFileSync(clientKeyPath),
214+
},
215+
},
216+
}
217+
: {};
218+
219+
const connection = await Connection.connect({ address: serverAddress, ...tlsConfig });
205220
const client = new WorkflowClient({ connection, namespace });
206221
const stopCondition = runForSeconds ? new UntilSecondsElapsed(runForSeconds) : new NumberOfWorkflows(iterations);
207222
const queryingOptions = queryName ? { queryName, queryIntervalMs, initialQueryDelayMs } : undefined;

packages/test/src/load/worker.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import fs from 'node:fs';
1+
import fs, { readFileSync } from 'node:fs';
22
import http from 'node:http';
33
import { inspect } from 'node:util';
44
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-grpc';
@@ -107,10 +107,15 @@ async function main() {
107107
const maxConcurrentWorkflowTaskExecutions = args['--max-concurrent-wft-executions'] ?? 100;
108108
const maxConcurrentLocalActivityExecutions = args['--max-concurrent-la-executions'] ?? 100;
109109
const maxCachedWorkflows = args['--max-cached-wfs'];
110+
const maxConcurrentWorkflowTaskPolls = args['--max-wft-pollers'] ?? 2;
111+
const maxConcurrentActivityTaskPolls = args['--max-at-pollers'] ?? 2;
112+
const workflowThreadPoolSize: number | undefined = args['--wf-thread-pool-size'];
110113
const oTelUrl = args['--otel-url'];
111114
const logLevel = (args['--log-level'] || 'INFO').toUpperCase();
112115
const logFile = args['--log-file'];
113116
const serverAddress = getRequired(args, '--server-address');
117+
const clientCertPath = args['--client-cert-path'];
118+
const clientKeyPath = args['--client-key-path'];
114119
const namespace = getRequired(args, '--ns');
115120
const taskQueue = getRequired(args, '--task-queue');
116121
const statusPort = args['--status-port'];
@@ -139,12 +144,26 @@ async function main() {
139144
logger,
140145
});
141146

147+
const tlsConfig =
148+
clientCertPath && clientKeyPath
149+
? {
150+
tls: {
151+
clientCertPair: {
152+
crt: readFileSync(clientCertPath),
153+
key: readFileSync(clientKeyPath),
154+
},
155+
},
156+
}
157+
: {};
158+
142159
const clientConnection = await Connection.connect({
143160
address: serverAddress,
161+
...tlsConfig,
144162
});
145163

146164
const connection = await NativeConnection.connect({
147165
address: serverAddress,
166+
...tlsConfig,
148167
});
149168

150169
await withOptionalOtel(args, async () => {
@@ -159,6 +178,9 @@ async function main() {
159178
maxConcurrentWorkflowTaskExecutions,
160179
maxCachedWorkflows,
161180
shutdownGraceTime,
181+
workflowThreadPoolSize,
182+
maxConcurrentActivityTaskPolls,
183+
maxConcurrentWorkflowTaskPolls,
162184
interceptors: {
163185
activityInbound: [() => new ConnectionInjectorInterceptor(clientConnection)],
164186
},

packages/test/src/test-sinks.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,8 @@ if (RUN_INTEGRATION_TESTS) {
162162
...defaultOptions,
163163
taskQueue,
164164
sinks,
165-
maxCachedWorkflows: 1,
166-
maxConcurrentWorkflowTaskExecutions: 1,
165+
maxCachedWorkflows: 0,
166+
maxConcurrentWorkflowTaskExecutions: 2,
167167
});
168168
const client = new WorkflowClient();
169169
await Promise.all([
@@ -179,7 +179,7 @@ if (RUN_INTEGRATION_TESTS) {
179179

180180
t.deepEqual(recordedMessages, [
181181
'Workflow execution started, replaying: false, hl: 3',
182-
'Workflow execution completed, replaying: false, hl: 12',
182+
'Workflow execution completed, replaying: false, hl: 8',
183183
]);
184184
});
185185

@@ -201,8 +201,8 @@ if (RUN_INTEGRATION_TESTS) {
201201
...defaultOptions,
202202
taskQueue,
203203
sinks,
204-
maxCachedWorkflows: 1,
205-
maxConcurrentWorkflowTaskExecutions: 1,
204+
maxCachedWorkflows: 0,
205+
maxConcurrentWorkflowTaskExecutions: 2,
206206
});
207207
const client = new WorkflowClient();
208208
await worker.runUntil(client.execute(workflows.logSinkTester, { taskQueue, workflowId: uuid4() }));
@@ -212,6 +212,6 @@ if (RUN_INTEGRATION_TESTS) {
212212
'Workflow execution started, replaying: false, hl: 3',
213213
'Workflow execution started, replaying: true, hl: 3',
214214
]);
215-
t.is(recordedMessages[recordedMessages.length - 1], 'Workflow execution completed, replaying: false, hl: 12');
215+
t.is(recordedMessages[recordedMessages.length - 1], 'Workflow execution completed, replaying: false, hl: 8');
216216
});
217217
}

0 commit comments

Comments
 (0)