Skip to content

Commit 28d4de3

Browse files
feat(client): Add startDelay to workflow (signal) start options (#1300)
1 parent 7bd36df commit 28d4de3

File tree

7 files changed

+128
-83
lines changed

7 files changed

+128
-83
lines changed

packages/client/src/workflow-client.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,7 @@ export class WorkflowClient extends BaseClient {
688688
workflowExecutionTimeout: options.workflowExecutionTimeout,
689689
workflowRunTimeout: options.workflowRunTimeout,
690690
workflowTaskTimeout: options.workflowTaskTimeout,
691+
workflowStartDelay: options.startDelay,
691692
retryPolicy: options.retry ? compileRetryPolicy(options.retry) : undefined,
692693
memo: options.memo ? { fields: await encodeMapToPayloads(this.dataConverter, options.memo) } : undefined,
693694
searchAttributes: options.searchAttributes
@@ -735,6 +736,7 @@ export class WorkflowClient extends BaseClient {
735736
workflowExecutionTimeout: opts.workflowExecutionTimeout,
736737
workflowRunTimeout: opts.workflowRunTimeout,
737738
workflowTaskTimeout: opts.workflowTaskTimeout,
739+
workflowStartDelay: opts.startDelay,
738740
retryPolicy: opts.retry ? compileRetryPolicy(opts.retry) : undefined,
739741
memo: opts.memo ? { fields: await encodeMapToPayloads(this.dataConverter, opts.memo) } : undefined,
740742
searchAttributes: opts.searchAttributes

packages/client/src/workflow-options.ts

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
1-
import {
2-
CommonWorkflowOptions,
3-
SignalDefinition,
4-
WithCompiledWorkflowOptions,
5-
WithWorkflowArgs,
6-
Workflow,
7-
} from '@temporalio/common';
1+
import { CommonWorkflowOptions, SignalDefinition, WithWorkflowArgs, Workflow } from '@temporalio/common';
2+
import { Duration, msOptionalToTs } from '@temporalio/common/lib/time';
3+
import { Replace } from '@temporalio/common/lib/type-helpers';
4+
import { google } from '@temporalio/proto';
85

96
export * from '@temporalio/common/lib/workflow-options';
107

@@ -37,6 +34,35 @@ export interface WorkflowOptions extends CommonWorkflowOptions {
3734
* @default true
3835
*/
3936
followRuns?: boolean;
37+
38+
/**
39+
* Amount of time to wait before starting the workflow.
40+
*
41+
* @experimental
42+
*/
43+
startDelay?: Duration;
44+
}
45+
46+
export type WithCompiledWorkflowOptions<T extends WorkflowOptions> = Replace<
47+
T,
48+
{
49+
workflowExecutionTimeout?: google.protobuf.IDuration;
50+
workflowRunTimeout?: google.protobuf.IDuration;
51+
workflowTaskTimeout?: google.protobuf.IDuration;
52+
startDelay?: google.protobuf.IDuration;
53+
}
54+
>;
55+
56+
export function compileWorkflowOptions<T extends WorkflowOptions>(options: T): WithCompiledWorkflowOptions<T> {
57+
const { workflowExecutionTimeout, workflowRunTimeout, workflowTaskTimeout, startDelay, ...rest } = options;
58+
59+
return {
60+
...rest,
61+
workflowExecutionTimeout: msOptionalToTs(workflowExecutionTimeout),
62+
workflowRunTimeout: msOptionalToTs(workflowRunTimeout),
63+
workflowTaskTimeout: msOptionalToTs(workflowTaskTimeout),
64+
startDelay: msOptionalToTs(startDelay),
65+
};
4066
}
4167

4268
export type WorkflowSignalWithStartOptions<SignalArgs extends any[] = []> = SignalArgs extends [any, ...any[]]

packages/common/src/workflow-options.ts

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import type { temporal, google } from '@temporalio/proto';
1+
import type { temporal } from '@temporalio/proto';
22
import { SearchAttributes, Workflow } from './interfaces';
33
import { RetryPolicy } from './retry-policy';
4-
import { Duration, msOptionalToTs } from './time';
5-
import { checkExtends, Replace } from './type-helpers';
4+
import { Duration } from './time';
5+
import { checkExtends } from './type-helpers';
66

77
// Avoid importing the proto implementation to reduce workflow bundle size
88
// Copied from temporal.api.enums.v1.WorkflowIdReusePolicy
@@ -135,26 +135,6 @@ export interface WorkflowDurationOptions {
135135

136136
export type CommonWorkflowOptions = BaseWorkflowOptions & WorkflowDurationOptions;
137137

138-
export type WithCompiledWorkflowOptions<T extends CommonWorkflowOptions> = Replace<
139-
T,
140-
{
141-
workflowExecutionTimeout?: google.protobuf.IDuration;
142-
workflowRunTimeout?: google.protobuf.IDuration;
143-
workflowTaskTimeout?: google.protobuf.IDuration;
144-
}
145-
>;
146-
147-
export function compileWorkflowOptions<T extends CommonWorkflowOptions>(options: T): WithCompiledWorkflowOptions<T> {
148-
const { workflowExecutionTimeout, workflowRunTimeout, workflowTaskTimeout, ...rest } = options;
149-
150-
return {
151-
...rest,
152-
workflowExecutionTimeout: msOptionalToTs(workflowExecutionTimeout),
153-
workflowRunTimeout: msOptionalToTs(workflowRunTimeout),
154-
workflowTaskTimeout: msOptionalToTs(workflowTaskTimeout),
155-
};
156-
}
157-
158138
export function extractWorkflowType<T extends Workflow>(workflowTypeOrFunc: string | T): string {
159139
if (typeof workflowTypeOrFunc === 'string') return workflowTypeOrFunc as string;
160140
if (typeof workflowTypeOrFunc === 'function') {

packages/core-bridge/src/conversions.rs

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,21 @@ use neon::{
55
prelude::*,
66
types::{JsBoolean, JsNumber, JsString},
77
};
8-
use std::{collections::HashMap, net::SocketAddr, time::Duration, sync::Arc};
8+
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
99
use temporal_sdk_core::{
10-
api::telemetry::{
11-
Logger, MetricTemporality, TelemetryOptions,
12-
TelemetryOptionsBuilder,
10+
api::telemetry::{Logger, MetricTemporality, TelemetryOptions, TelemetryOptionsBuilder},
11+
api::{
12+
telemetry::{
13+
metrics::CoreMeter, OtelCollectorOptionsBuilder, PrometheusExporterOptionsBuilder,
14+
},
15+
worker::{WorkerConfig, WorkerConfigBuilder},
1316
},
14-
api::{worker::{WorkerConfig, WorkerConfigBuilder}, telemetry::{PrometheusExporterOptionsBuilder, metrics::CoreMeter, OtelCollectorOptionsBuilder}},
1517
ephemeral_server::{
1618
TemporalDevServerConfig, TemporalDevServerConfigBuilder, TestServerConfig,
1719
TestServerConfigBuilder,
1820
},
19-
ClientOptions, ClientOptionsBuilder, ClientTlsConfig, RetryConfig, TlsConfig, Url, telemetry::{start_prometheus_metric_exporter, build_otlp_metric_exporter},
21+
telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter},
22+
ClientOptions, ClientOptionsBuilder, ClientTlsConfig, RetryConfig, TlsConfig, Url,
2023
};
2124

2225
pub enum EphemeralServerConfig {
@@ -163,7 +166,9 @@ impl ObjectHandleConversionsExt for Handle<'_, JsObject> {
163166

164167
fn as_telemetry_options(&self, cx: &mut FunctionContext) -> NeonResult<TelemetryOptions> {
165168
let mut telemetry_opts = TelemetryOptionsBuilder::default();
166-
if js_optional_value_getter!(cx, self, "noTemporalPrefixForMetrics", JsBoolean).unwrap_or_default() {
169+
if js_optional_value_getter!(cx, self, "noTemporalPrefixForMetrics", JsBoolean)
170+
.unwrap_or_default()
171+
{
167172
telemetry_opts.metric_prefix("".to_string());
168173
}
169174

@@ -193,35 +198,45 @@ impl ObjectHandleConversionsExt for Handle<'_, JsObject> {
193198
let addr = js_value_getter!(cx, prom, "bindAddress", JsString);
194199
match addr.parse::<SocketAddr>() {
195200
Ok(addr) => options.socket_addr(addr),
196-
Err(_) => return cx.throw_type_error(
197-
"Invalid telemetryOptions.metrics.prometheus.bindAddress",
198-
)?,
201+
Err(_) => {
202+
return cx.throw_type_error(
203+
"Invalid telemetryOptions.metrics.prometheus.bindAddress",
204+
)?
205+
}
199206
};
200207

201-
202-
let options = options.build().expect("Failed to build prometheus exporter options");
203-
let prom_info = start_prometheus_metric_exporter(options).expect("Failed creating prometheus exporter");
208+
let options = options
209+
.build()
210+
.expect("Failed to build prometheus exporter options");
211+
let prom_info = start_prometheus_metric_exporter(options)
212+
.expect("Failed creating prometheus exporter");
204213
telemetry_opts.metrics(prom_info.meter as Arc<dyn CoreMeter>);
205-
206214
} else if let Some(ref otel) = js_optional_getter!(cx, metrics, "otel", JsObject) {
207215
let mut options = OtelCollectorOptionsBuilder::default();
208216

209217
let url = js_value_getter!(cx, otel, "url", JsString);
210218
match Url::parse(&url) {
211219
Ok(url) => options.url(url),
212-
Err(_) => return cx.throw_type_error("Invalid telemetryOptions.metrics.otel.url"),
220+
Err(_) => {
221+
return cx.throw_type_error("Invalid telemetryOptions.metrics.otel.url")
222+
}
213223
};
214224

215225
if let Some(ref headers) = js_optional_getter!(cx, otel, "headers", JsObject) {
216226
options.headers(headers.as_hash_map_of_string_to_string(cx)?);
217227
};
218228

219-
if let Some(metric_periodicity) = js_optional_value_getter!(cx, otel, "metricsExportInterval", JsNumber).map(|f| f as u64) {
229+
if let Some(metric_periodicity) =
230+
js_optional_value_getter!(cx, otel, "metricsExportInterval", JsNumber)
231+
.map(|f| f as u64)
232+
{
220233
options.metric_periodicity(Duration::from_millis(metric_periodicity));
221234
}
222235

223236
// FIXME: Move temporality to the otel object
224-
if let Some(temporality) = js_optional_value_getter!(cx, metrics, "temporality", JsString) {
237+
if let Some(temporality) =
238+
js_optional_value_getter!(cx, metrics, "temporality", JsString)
239+
{
225240
match temporality.as_str() {
226241
"cumulative" => options.metric_temporality(MetricTemporality::Cumulative),
227242
"delta" => options.metric_temporality(MetricTemporality::Delta),
@@ -231,8 +246,11 @@ impl ObjectHandleConversionsExt for Handle<'_, JsObject> {
231246
};
232247
};
233248

234-
let options = options.build().expect("Failed to build otlp exporter options");
235-
let otlp_exporter = build_otlp_metric_exporter(options).expect("Failed to build otlp exporter");
249+
let options = options
250+
.build()
251+
.expect("Failed to build otlp exporter options");
252+
let otlp_exporter =
253+
build_otlp_metric_exporter(options).expect("Failed to build otlp exporter");
236254
telemetry_opts.metrics(Arc::new(otlp_exporter) as Arc<dyn CoreMeter>);
237255
} else {
238256
cx.throw_type_error(

packages/core-bridge/src/helpers.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -189,27 +189,31 @@ where
189189
}
190190

191191
// Recursively convert a Serde value to a JS value
192-
pub fn serde_value_to_js_value<'a>(cx: &mut impl Context<'a>, val: serde_json::Value) -> JsResult<'a, JsValue> {
192+
pub fn serde_value_to_js_value<'a>(
193+
cx: &mut impl Context<'a>,
194+
val: serde_json::Value,
195+
) -> JsResult<'a, JsValue> {
193196
match val {
194197
serde_json::Value::String(s) => Ok(cx.string(s).upcast()),
195198
serde_json::Value::Number(n) => Ok(cx.number(n.as_f64().unwrap()).upcast()),
196199
serde_json::Value::Bool(b) => Ok(cx.boolean(b).upcast()),
197200
serde_json::Value::Null => Ok(cx.null().upcast()),
198-
serde_json::Value::Array(vec ) => {
201+
serde_json::Value::Array(vec) => {
199202
let arr: Handle<'a, JsArray> = JsArray::new(cx, vec.len() as u32);
200203
for (i, v) in vec.into_iter().enumerate() {
201204
let v = serde_value_to_js_value(cx, v)?;
202205
arr.set(cx, i as u32, v)?;
203206
}
204207
Ok(arr.upcast())
205208
}
206-
serde_json::Value::Object(map) => {
207-
hashmap_to_js_value(cx, map).map(|v| v.upcast())
208-
}
209+
serde_json::Value::Object(map) => hashmap_to_js_value(cx, map).map(|v| v.upcast()),
209210
}
210211
}
211212

212-
pub fn hashmap_to_js_value<'a>(cx: &mut impl Context<'a>, map: impl IntoIterator<Item = (String, serde_json::Value)>) -> JsResult<'a, JsObject> {
213+
pub fn hashmap_to_js_value<'a>(
214+
cx: &mut impl Context<'a>,
215+
map: impl IntoIterator<Item = (String, serde_json::Value)>,
216+
) -> JsResult<'a, JsObject> {
213217
let obj: Handle<'a, JsObject> = cx.empty_object();
214218
for (k, v) in map {
215219
let k = cx.string(snake_to_camel(k));
@@ -228,7 +232,7 @@ fn snake_to_camel(input: String) -> String {
228232
result.push_str(&input[..first]);
229233
}
230234
let mut capitalize = true;
231-
for c in input[first+1..].chars() {
235+
for c in input[first + 1..].chars() {
232236
if c == '_' {
233237
capitalize = true;
234238
} else if capitalize {
@@ -251,6 +255,9 @@ mod tests {
251255
fn snake_to_camel_works() {
252256
assert_eq!(snake_to_camel("this_is_a_test".into()), "thisIsATest");
253257
assert_eq!(snake_to_camel("this___IS_a_TEST".into()), "thisIsATest");
254-
assert_eq!(snake_to_camel("éàç_this_is_a_test".into()), "éàçThisIsATest");
258+
assert_eq!(
259+
snake_to_camel("éàç_this_is_a_test".into()),
260+
"éàçThisIsATest"
261+
);
255262
}
256263
}

packages/core-bridge/src/worker.rs

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -83,19 +83,11 @@ pub async fn start_worker_loop(
8383
worker.initiate_shutdown();
8484
send_result(channel, callback, |cx| Ok(cx.undefined()));
8585
}
86-
WorkerRequest::PollWorkflowActivation {
87-
callback,
88-
} => {
89-
handle_poll_workflow_activation_request(
90-
worker, channel, callback,
91-
)
92-
.await
86+
WorkerRequest::PollWorkflowActivation { callback } => {
87+
handle_poll_workflow_activation_request(worker, channel, callback).await
9388
}
94-
WorkerRequest::PollActivityTask {
95-
callback,
96-
} => {
97-
handle_poll_activity_task_request(worker, channel, callback)
98-
.await
89+
WorkerRequest::PollActivityTask { callback } => {
90+
handle_poll_activity_task_request(worker, channel, callback).await
9991
}
10092
WorkerRequest::CompleteWorkflowActivation {
10193
completion,
@@ -104,11 +96,7 @@ pub async fn start_worker_loop(
10496
void_future_to_js(
10597
channel,
10698
callback,
107-
async move {
108-
worker
109-
.complete_workflow_activation(completion)
110-
.await
111-
},
99+
async move { worker.complete_workflow_activation(completion).await },
112100
|cx, err| -> JsResult<JsObject> {
113101
match err {
114102
CompleteWfError::MalformedWorkflowCompletion {
@@ -126,11 +114,7 @@ pub async fn start_worker_loop(
126114
void_future_to_js(
127115
channel,
128116
callback,
129-
async move {
130-
worker
131-
.complete_activity_task(completion)
132-
.await
133-
},
117+
async move { worker.complete_activity_task(completion).await },
134118
|cx, err| -> JsResult<JsObject> {
135119
match err {
136120
CompleteActivityError::MalformedActivityCompletion {
@@ -158,10 +142,7 @@ async fn handle_poll_workflow_activation_request(
158142
channel: Arc<Channel>,
159143
callback: Root<JsFunction>,
160144
) {
161-
match worker
162-
.poll_workflow_activation()
163-
.await
164-
{
145+
match worker.poll_workflow_activation().await {
165146
Ok(task) => {
166147
send_result(channel, callback, move |cx| {
167148
let len = task.encoded_len();

packages/test/src/test-integration-workflows.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { randomUUID } from 'crypto';
12
import { firstValueFrom, Subject } from 'rxjs';
23
import { WorkflowFailedError } from '@temporalio/client';
34
import * as activity from '@temporalio/activity';
@@ -6,6 +7,7 @@ import { CancelReason } from '@temporalio/worker/lib/activity';
67
import * as workflow from '@temporalio/workflow';
78
import { signalSchedulingWorkflow } from './activities/helpers';
89
import { activityStartedSignal } from './workflows/definitions';
10+
import * as workflows from './workflows';
911
import { helpers, makeTestFunction } from './helpers-integration';
1012

1113
const test = makeTestFunction({ workflowsPath: __filename });
@@ -191,3 +193,32 @@ test('Activity initialInterval is not getting rounded', async (t) => {
191193
const retryPolicy = activityTaskScheduledEvents?.activityTaskScheduledEventAttributes?.retryPolicy;
192194
t.is(tsToMs(retryPolicy?.initialInterval), 50);
193195
});
196+
197+
test('Start of workflow is delayed', async (t) => {
198+
const { startWorkflow } = helpers(t);
199+
// This workflow never runs
200+
const handle = await startWorkflow(runTestActivity, {
201+
startDelay: '5678s',
202+
});
203+
const { events } = await handle.fetchHistory();
204+
const workflowExecutionStartedEvent = events?.find((ev) => ev.workflowExecutionStartedEventAttributes);
205+
const startDelay = workflowExecutionStartedEvent?.workflowExecutionStartedEventAttributes?.firstWorkflowTaskBackoff;
206+
t.is(tsToMs(startDelay), 5678000);
207+
});
208+
209+
test('Start of workflow with signal is delayed', async (t) => {
210+
const { taskQueue } = helpers(t);
211+
// This workflow never runs
212+
const handle = await t.context.env.client.workflow.signalWithStart(workflows.interruptableWorkflow, {
213+
workflowId: randomUUID(),
214+
taskQueue,
215+
startDelay: '4678s',
216+
signal: workflows.interruptSignal,
217+
signalArgs: ['Never called'],
218+
});
219+
220+
const { events } = await handle.fetchHistory();
221+
const workflowExecutionStartedEvent = events?.find((ev) => ev.workflowExecutionStartedEventAttributes);
222+
const startDelay = workflowExecutionStartedEvent?.workflowExecutionStartedEventAttributes?.firstWorkflowTaskBackoff;
223+
t.is(tsToMs(startDelay), 4678000);
224+
});

0 commit comments

Comments
 (0)