Skip to content

Commit d3fe9bd

Browse files
Add Build ID to workflow info (#1333)
Co-authored-by: James Watkins-Harvey <james.watkinsharvey@temporal.io> Co-authored-by: James Watkins-Harvey <mjameswh@users.noreply.github.com>
1 parent 4a2af02 commit d3fe9bd

File tree

10 files changed

+187
-144
lines changed

10 files changed

+187
-144
lines changed

packages/core-bridge/Cargo.lock

Lines changed: 110 additions & 129 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/core-bridge/src/conversions.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,22 @@ impl ArrayHandleConversionsExt for Handle<'_, JsArray> {
3737
let len = js_vec.len();
3838
let mut ret_vec = Vec::<String>::with_capacity(len);
3939

40-
for i in 0..len {
41-
ret_vec.push(js_vec[i].downcast_or_throw::<JsString, _>(cx)?.value(cx));
40+
for i in js_vec.iter().take(len) {
41+
ret_vec.push(i.downcast_or_throw::<JsString, _>(cx)?.value(cx));
4242
}
4343
Ok(ret_vec)
4444
}
4545
}
4646

47-
type TelemOptsRes = NeonResult<(
47+
pub(crate) type TelemOptsRes = (
4848
TelemetryOptions,
4949
Option<Box<dyn FnOnce() -> Arc<dyn CoreMeter> + Send>>,
50-
)>;
50+
);
5151

5252
pub trait ObjectHandleConversionsExt {
5353
fn set_default(&self, cx: &mut FunctionContext, key: &str, value: &str) -> NeonResult<()>;
5454
fn as_client_options(&self, ctx: &mut FunctionContext) -> NeonResult<ClientOptions>;
55-
fn as_telemetry_options(&self, cx: &mut FunctionContext) -> TelemOptsRes;
55+
fn as_telemetry_options(&self, cx: &mut FunctionContext) -> NeonResult<TelemOptsRes>;
5656
fn as_worker_config(&self, cx: &mut FunctionContext) -> NeonResult<WorkerConfig>;
5757
fn as_ephemeral_server_config(
5858
&self,
@@ -169,7 +169,7 @@ impl ObjectHandleConversionsExt for Handle<'_, JsObject> {
169169
.expect("Core server gateway options must be valid"))
170170
}
171171

172-
fn as_telemetry_options(&self, cx: &mut FunctionContext) -> TelemOptsRes {
172+
fn as_telemetry_options(&self, cx: &mut FunctionContext) -> NeonResult<TelemOptsRes> {
173173
let mut telemetry_opts = TelemetryOptionsBuilder::default();
174174
if js_optional_value_getter!(cx, self, "noTemporalPrefixForMetrics", JsBoolean)
175175
.unwrap_or_default()

packages/core-bridge/src/runtime.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ use std::{
1111
time::{Duration, SystemTime, UNIX_EPOCH},
1212
};
1313
use temporal_client::{ClientInitError, ConfiguredClient, TemporalServiceClientWithMetrics};
14-
use temporal_sdk_core::api::telemetry::metrics::CoreMeter;
15-
use temporal_sdk_core::api::telemetry::{CoreTelemetry, TelemetryOptions};
14+
use temporal_sdk_core::api::telemetry::CoreTelemetry;
1615
use temporal_sdk_core::CoreRuntime;
1716
use temporal_sdk_core::{
1817
ephemeral_server::EphemeralServer as CoreEphemeralServer,
@@ -117,10 +116,7 @@ pub enum RuntimeRequest {
117116
/// Bridges requests from JS to core and sends responses back to JS using a neon::Channel.
118117
/// Blocks current thread until a [Shutdown] request is received in channel.
119118
pub fn start_bridge_loop(
120-
telemetry_options: (
121-
TelemetryOptions,
122-
Option<Box<dyn FnOnce() -> Arc<dyn CoreMeter> + Send>>,
123-
),
119+
telemetry_options: TelemOptsRes,
124120
channel: Arc<Channel>,
125121
receiver: &mut UnboundedReceiver<RuntimeRequest>,
126122
) {

packages/test/src/integration-tests-old.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,7 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
708708
historySize: result.historySize,
709709
startTime: result.startTime,
710710
runStartTime: result.runStartTime,
711+
currentBuildId: result.currentBuildId,
711712
// unsafe.now is a function, so doesn't make it through serialization, but .now is required, so we need to cast
712713
unsafe: { isReplaying: false } as UnsafeWorkflowInfo,
713714
});

packages/test/src/test-integration-workflows.ts

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import * as activity from '@temporalio/activity';
55
import { tsToMs } from '@temporalio/common/lib/time';
66
import { CancelReason } from '@temporalio/worker/lib/activity';
77
import * as workflow from '@temporalio/workflow';
8+
import { defineQuery, defineSignal } from '@temporalio/workflow';
89
import { signalSchedulingWorkflow } from './activities/helpers';
910
import { activityStartedSignal } from './workflows/definitions';
1011
import * as workflows from './workflows';
@@ -226,7 +227,10 @@ test('Start of workflow with signal is delayed', async (t) => {
226227
export async function executeEagerActivity(): Promise<void> {
227228
const scheduleActivity = () =>
228229
workflow
229-
.proxyActivities({ scheduleToCloseTimeout: '5s', allowEagerDispatch: true })
230+
.proxyActivities({
231+
scheduleToCloseTimeout: '5s',
232+
allowEagerDispatch: true,
233+
})
230234
.testActivity()
231235
.then((res) => {
232236
if (res !== 'workflow-and-activity-worker')
@@ -305,3 +309,53 @@ test("Worker doesn't request Eager Activity Dispatch if no activities are regist
305309
t.is(activityTaskStarted?.length, 1);
306310
t.is(activityTaskStarted?.[0]?.activityTaskStartedEventAttributes?.attempt, 1);
307311
});
312+
313+
const unblockSignal = defineSignal('unblock');
314+
const getBuildIdQuery = defineQuery<string>('getBuildId');
315+
316+
export async function buildIdTester(): Promise<void> {
317+
let blocked = true;
318+
workflow.setHandler(unblockSignal, () => {
319+
blocked = false;
320+
});
321+
322+
workflow.setHandler(getBuildIdQuery, () => {
323+
return workflow.workflowInfo().currentBuildId ?? '';
324+
});
325+
326+
// The unblock signal will only be sent once we are in Worker 1.1.
327+
// Therefore, up to this point, we are runing in Worker 1.0
328+
await workflow.condition(() => !blocked);
329+
// From this point on, we are runing in Worker 1.1
330+
331+
// Prevent workflow completion
332+
await workflow.condition(() => false);
333+
}
334+
335+
test('Build Id appropriately set in workflow info', async (t) => {
336+
const { taskQueue, createWorker } = helpers(t);
337+
const wfid = `${taskQueue}-` + randomUUID();
338+
const client = t.context.env.client;
339+
340+
const worker1 = await createWorker({ buildId: '1.0' });
341+
await worker1.runUntil(async () => {
342+
const handle = await client.workflow.start(buildIdTester, {
343+
taskQueue,
344+
workflowId: wfid,
345+
});
346+
t.is(await handle.query(getBuildIdQuery), '1.0');
347+
});
348+
349+
await client.workflowService.resetStickyTaskQueue({
350+
namespace: worker1.options.namespace,
351+
execution: { workflowId: wfid },
352+
});
353+
354+
const worker2 = await createWorker({ buildId: '1.1' });
355+
await worker2.runUntil(async () => {
356+
const handle = await client.workflow.getHandle(wfid);
357+
t.is(await handle.query(getBuildIdQuery), '1.0');
358+
await handle.signal(unblockSignal);
359+
t.is(await handle.query(getBuildIdQuery), '1.1');
360+
});
361+
});

packages/test/src/test-sinks.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ if (RUN_INTEGRATION_TESTS) {
116116
});
117117

118118
// Capture volatile values that are hard to predict
119-
const { historySize, startTime, runStartTime } = recordedCalls[0].info;
119+
const { historySize, startTime, runStartTime, currentBuildId } = recordedCalls[0].info;
120120
t.true(historySize > 300);
121121

122122
const info: WorkflowInfo = {
@@ -146,6 +146,7 @@ if (RUN_INTEGRATION_TESTS) {
146146
historySize,
147147
startTime,
148148
runStartTime,
149+
currentBuildId,
149150
// unsafe.now() doesn't make it through serialization, but .now is required, so we need to cast
150151
unsafe: {
151152
isReplaying: false,

packages/worker/src/worker.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1224,6 +1224,7 @@ export class Worker {
12241224
// A zero value means that it was not set by the server
12251225
historySize: activation.historySizeBytes.toNumber(),
12261226
continueAsNewSuggested: activation.continueAsNewSuggested,
1227+
currentBuildId: activation.buildIdForCurrentTask,
12271228
unsafe: {
12281229
now: () => Date.now(), // re-set in initRuntime
12291230
isReplaying: activation.isReplaying,

packages/workflow/src/interfaces.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,14 @@ export interface WorkflowInfo {
162162
*/
163163
readonly cronScheduleToScheduleInterval?: number;
164164

165+
/**
166+
* The Build ID of the worker which executed the current Workflow Task. May be undefined if the
167+
* task was completed by a worker without a Build ID. If this worker is the one executing this
168+
* task for the first time and has a Build ID set, then its ID will be used. This value may change
169+
* over the lifetime of the workflow run, but is deterministic and safe to use for branching.
170+
*/
171+
readonly currentBuildId?: string;
172+
165173
readonly unsafe: UnsafeWorkflowInfo;
166174
}
167175

packages/workflow/src/worker-interface.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ export function activate(activation: coresdk.workflow_activation.WorkflowActivat
199199
// historySize === 0 means WFT was generated by pre-1.20.0 server, and the history size is unknown
200200
historySize: activation.historySizeBytes?.toNumber() || 0,
201201
continueAsNewSuggested: activation.continueAsNewSuggested ?? false,
202+
currentBuildId: activation.buildIdForCurrentTask ?? undefined,
202203
unsafe: {
203204
...info.unsafe,
204205
isReplaying: activation.isReplaying ?? false,

0 commit comments

Comments
 (0)