Skip to content

Commit fe00e59

Browse files
authored
feat(worker): Don't forcefully shutdown worker (#1072)
BREAKING CHANGE: `WorkerOptions.shutdownGraceTime` no longer forcefully shuts the worker down. Set `WorkerOptions.shutdownForceTime` to force shutdown. As before, in-flight activities are automatically cancelled on worker shutdown. Setting `shutdownGraceTime` will add a delay before cancelling the activities. Any activities that respond to this synthetic cancellation will fail their current attempt as opposted to ending up cancelled.
1 parent 496b044 commit fe00e59

16 files changed

+233
-206
lines changed

packages/core-bridge/Cargo.lock

Lines changed: 0 additions & 34 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/core-bridge/sdk-core

Submodule sdk-core updated 115 files

packages/core-bridge/src/conversions.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,10 @@ impl ObjectHandleConversionsExt for Handle<'_, JsObject> {
323323
js_optional_getter!(cx, self, "maxTaskQueueActivitiesPerSecond", JsNumber)
324324
.map(|num| num.value(cx) as f64);
325325

326+
let graceful_shutdown_period =
327+
js_optional_getter!(cx, self, "shutdownGraceTimeMs", JsNumber)
328+
.map(|num| Duration::from_millis(num.value(cx) as u64));
329+
326330
match WorkerConfigBuilder::default()
327331
.worker_build_id(js_value_getter!(cx, self, "buildId", JsString))
328332
.client_identity_override(Some(js_value_getter!(cx, self, "identity", JsString)))
@@ -332,6 +336,7 @@ impl ObjectHandleConversionsExt for Handle<'_, JsObject> {
332336
.max_outstanding_local_activities(max_outstanding_local_activities)
333337
.max_cached_workflows(max_cached_workflows)
334338
.sticky_queue_schedule_to_start_timeout(sticky_queue_schedule_to_start_timeout)
339+
.graceful_shutdown_period(graceful_shutdown_period)
335340
.namespace(namespace)
336341
.task_queue(task_queue)
337342
.max_heartbeat_throttle_interval(max_heartbeat_throttle_interval)
@@ -374,7 +379,7 @@ impl ObjectHandleConversionsExt for Handle<'_, JsObject> {
374379

375380
let exec_version = match version.as_str() {
376381
"default" => {
377-
temporal_sdk_core::ephemeral_server::EphemeralExeVersion::Default {
382+
temporal_sdk_core::ephemeral_server::EphemeralExeVersion::SDKDefault {
378383
sdk_name: "sdk-typescript".to_owned(),
379384
sdk_version,
380385
}

packages/core-bridge/src/runtime.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -144,13 +144,16 @@ pub fn start_bridge_loop(
144144
headers,
145145
callback,
146146
} => {
147-
// `metrics_meter` (second arg) can be None here since we don't use the
148-
// returned client directly at the moment, when we repurpose the client to be
149-
// used by a Worker, `init_worker` will attach the correct metrics meter for
150-
// us.
147+
// `metrics_meter` can be None here since we don't use the returned client
148+
// directly at the moment, when we repurpose the client to be used by a Worker,
149+
// `init_worker` will attach the correct metrics meter for us.
151150
core_runtime.tokio_handle().spawn(async move {
151+
let metrics_meter = None;
152152
match options
153-
.connect_no_namespace(None, headers.map(|h| Arc::new(RwLock::new(h))))
153+
.connect_no_namespace(
154+
metrics_meter,
155+
headers.map(|h| Arc::new(RwLock::new(h))),
156+
)
154157
.await
155158
{
156159
Err(err) => {
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import { Client, WorkflowHandle } from '@temporalio/client';
2+
import { QueryDefinition } from '@temporalio/common';
3+
import { getContext } from './interceptors';
4+
5+
function getSchedulingWorkflowHandle(): WorkflowHandle {
6+
const { info, connection, dataConverter } = getContext();
7+
const { workflowExecution } = info;
8+
const client = new Client({ connection, namespace: info.workflowNamespace, dataConverter });
9+
return client.workflow.getHandle(workflowExecution.workflowId, workflowExecution.runId);
10+
}
11+
12+
export async function signalSchedulingWorkflow(signalName: string): Promise<void> {
13+
const handle = getSchedulingWorkflowHandle();
14+
await handle.signal(signalName);
15+
}
16+
17+
export async function queryOwnWf<R, A extends any[]>(queryDef: QueryDefinition<R, A>, ...args: A): Promise<R> {
18+
const handle = getSchedulingWorkflowHandle();
19+
return await handle.query(queryDef, ...args);
20+
}

packages/test/src/activities/index.ts

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
/* eslint-disable @typescript-eslint/no-empty-function */
22
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */
33
import { Context } from '@temporalio/activity';
4-
import { Client, WorkflowHandle } from '@temporalio/client';
5-
import { ApplicationFailure, QueryDefinition } from '@temporalio/common';
4+
import { ApplicationFailure } from '@temporalio/common';
65
import { ProtoActivityInput, ProtoActivityResult } from '../../protos/root';
76
import { cancellableFetch as cancellableFetchInner } from './cancellable-fetch';
87
import { fakeProgress as fakeProgressInner } from './fake-progress';
9-
import { getContext } from './interceptors';
8+
import { signalSchedulingWorkflow } from './helpers';
109

10+
export { queryOwnWf, signalSchedulingWorkflow } from './helpers';
1111
export { throwSpecificError } from './failure-tester';
1212

1313
/**
@@ -63,23 +63,6 @@ export async function waitForCancellation(): Promise<void> {
6363
await Context.current().cancelled;
6464
}
6565

66-
function getSchedulingWorkflowHandle(): WorkflowHandle {
67-
const { info, connection, dataConverter } = getContext();
68-
const { workflowExecution } = info;
69-
const client = new Client({ connection, namespace: info.workflowNamespace, dataConverter });
70-
return client.workflow.getHandle(workflowExecution.workflowId, workflowExecution.runId);
71-
}
72-
73-
export async function signalSchedulingWorkflow(signalName: string) {
74-
const handle = getSchedulingWorkflowHandle();
75-
await handle.signal(signalName);
76-
}
77-
78-
export async function queryOwnWf<R, A extends any[]>(queryDef: QueryDefinition<R, A>, ...args: A): Promise<R> {
79-
const handle = getSchedulingWorkflowHandle();
80-
return await handle.query(queryDef, ...args);
81-
}
82-
8366
export async function fakeProgress(sleepIntervalMs = 1000, numIters = 1000): Promise<void> {
8467
await signalSchedulingWorkflow('activityStarted');
8568
await fakeProgressInner(sleepIntervalMs, numIters);

packages/test/src/integration-tests.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
119119
// NOTE: at the time this was added temporalite did not expose the grpc OperatorService.
120120
try {
121121
await connection.operatorService.addSearchAttributes({
122+
namespace: 'default',
122123
searchAttributes: {
123124
CustomIntField: iface.temporal.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_INT,
124125
CustomBoolField: iface.temporal.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_BOOL,

packages/test/src/test-activities-are-cancelled-on-worker-shutdown.ts

Lines changed: 0 additions & 51 deletions
This file was deleted.

0 commit comments

Comments
 (0)