Skip to content

Commit 29e8037

Browse files
feat(worker): Add a workflow metadata query (#1319)
Co-authored-by: James Watkins-Harvey <mjameswh@users.noreply.github.com>
1 parent ccef367 commit 29e8037

File tree

8 files changed

+161
-45
lines changed

8 files changed

+161
-45
lines changed

packages/common/src/interfaces.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,15 @@ export type Payload = temporal.api.common.v1.IPayload;
66
export type WorkflowReturnType = Promise<any>;
77
export type WorkflowUpdateType = (...args: any[]) => Promise<any> | any;
88
export type WorkflowUpdateValidatorType = (...args: any[]) => void;
9+
export type WorkflowUpdateAnnotatedType = {
10+
handler: WorkflowUpdateType;
11+
validator?: WorkflowUpdateValidatorType;
12+
description?: string;
13+
};
914
export type WorkflowSignalType = (...args: any[]) => Promise<void> | void;
15+
export type WorkflowSignalAnnotatedType = { handler: WorkflowSignalType; description?: string };
1016
export type WorkflowQueryType = (...args: any[]) => any;
17+
export type WorkflowQueryAnnotatedType = { handler: WorkflowQueryType; description?: string };
1118

1219
/**
1320
* Broad Workflow function definition, specific Workflows will typically use a narrower type definition, e.g:

packages/proto/scripts/compile-proto.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const coreProtoPath = resolve(protoBaseDir, 'local/temporal/sdk/core/core_interf
1515
const workflowServiceProtoPath = resolve(protoBaseDir, 'api_upstream/temporal/api/workflowservice/v1/service.proto');
1616
const operatorServiceProtoPath = resolve(protoBaseDir, 'api_upstream/temporal/api/operatorservice/v1/service.proto');
1717
const errorDetailsProtoPath = resolve(protoBaseDir, 'api_upstream/temporal/api/errordetails/v1/message.proto');
18+
const workflowMetadataProtoPath = resolve(protoBaseDir, 'api_upstream/temporal/api/sdk/v1/workflow_metadata.proto');
1819
const testServiceRRProtoPath = resolve(
1920
protoBaseDir,
2021
'testsrv_upstream/temporal/api/testservice/v1/request_response.proto'
@@ -51,6 +52,7 @@ async function compileProtos(dtsOutputFile, ...args) {
5152
workflowServiceProtoPath,
5253
operatorServiceProtoPath,
5354
errorDetailsProtoPath,
55+
workflowMetadataProtoPath,
5456
testServiceRRProtoPath,
5557
testServiceProtoPath,
5658
healthServiceProtoPath,

packages/test/src/integration-tests-old.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
398398
await t.throwsAsync(workflow.query('not found'), {
399399
instanceOf: QueryNotRegisteredError,
400400
message:
401-
'Workflow did not register a handler for not found. Registered queries: [__stack_trace __enhanced_stack_trace isBlocked]',
401+
'Workflow did not register a handler for not found. Registered queries: [__stack_trace __enhanced_stack_trace __temporal_workflow_metadata isBlocked]',
402402
});
403403
});
404404

packages/test/src/test-integration-workflows.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,46 @@ test('Start of workflow with signal is delayed', async (t) => {
224224
t.is(tsToMs(startDelay), 4678000);
225225
});
226226

227+
export async function queryWorkflowMetadata(): Promise<void> {
228+
const dummyQuery1 = workflow.defineQuery<void>('dummyQuery1');
229+
const dummyQuery2 = workflow.defineQuery<void>('dummyQuery2');
230+
const dummyQuery3 = workflow.defineQuery<void>('dummyQuery3');
231+
const dummySignal1 = workflow.defineSignal('dummySignal1');
232+
const dummyUpdate1 = workflow.defineUpdate<void>('dummyUpdate1');
233+
234+
workflow.setHandler(dummyQuery1, () => void {}, { description: 'ignore' });
235+
// Override description
236+
workflow.setHandler(dummyQuery1, () => void {}, { description: 'query1' });
237+
workflow.setHandler(dummyQuery2, () => void {}, { description: 'query2' });
238+
workflow.setHandler(dummyQuery3, () => void {}, { description: 'query3' });
239+
// Remove handler
240+
workflow.setHandler(dummyQuery3, undefined);
241+
workflow.setHandler(dummySignal1, () => void {}, { description: 'signal1' });
242+
workflow.setHandler(dummyUpdate1, () => void {}, { description: 'update1' });
243+
await workflow.condition(() => false);
244+
}
245+
246+
test('Query workflow metadata returns handler descriptions', async (t) => {
247+
const { createWorker, startWorkflow } = helpers(t);
248+
249+
const worker = await createWorker();
250+
251+
await worker.runUntil(async () => {
252+
const handle = await startWorkflow(queryWorkflowMetadata);
253+
const meta = await handle.query(workflow.workflowMetadataQuery);
254+
t.is(meta.definition?.type, 'queryWorkflowMetadata');
255+
const queryDefinitions = meta.definition?.queryDefinitions;
256+
// Three built-in ones plus dummyQuery1 and dummyQuery2
257+
t.is(queryDefinitions?.length, 5);
258+
t.deepEqual(queryDefinitions?.[3], { name: 'dummyQuery1', description: 'query1' });
259+
t.deepEqual(queryDefinitions?.[4], { name: 'dummyQuery2', description: 'query2' });
260+
const signalDefinitions = meta.definition?.signalDefinitions;
261+
t.deepEqual(signalDefinitions, [{ name: 'dummySignal1', description: 'signal1' }]);
262+
const updateDefinitions = meta.definition?.updateDefinitions;
263+
t.deepEqual(updateDefinitions, [{ name: 'dummyUpdate1', description: 'update1' }]);
264+
});
265+
});
266+
227267
export async function executeEagerActivity(): Promise<void> {
228268
const scheduleActivity = () =>
229269
workflow

packages/test/src/test-workflows.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1922,10 +1922,10 @@ test('query not found - successString', async (t) => {
19221922
queryId: 'qid',
19231923
failed: {
19241924
message:
1925-
'Workflow did not register a handler for not-found. Registered queries: [__stack_trace __enhanced_stack_trace]',
1925+
'Workflow did not register a handler for not-found. Registered queries: [__stack_trace __enhanced_stack_trace __temporal_workflow_metadata]',
19261926
source: 'TypeScriptSDK',
19271927
stackTrace:
1928-
'ReferenceError: Workflow did not register a handler for not-found. Registered queries: [__stack_trace __enhanced_stack_trace]',
1928+
'ReferenceError: Workflow did not register a handler for not-found. Registered queries: [__stack_trace __enhanced_stack_trace __temporal_workflow_metadata]',
19291929
applicationFailureInfo: {
19301930
type: 'ReferenceError',
19311931
nonRetryable: false,

packages/workflow/src/interfaces.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,3 +464,18 @@ export type DefaultSignalHandler = (signalName: string, ...args: unknown[]) => v
464464
* A validation function capable of accepting the arguments for a given UpdateDefinition.
465465
*/
466466
export type UpdateValidator<Args extends any[]> = (...args: Args) => void;
467+
468+
/**
469+
* A description of a query handler.
470+
*/
471+
export type QueryHandlerOptions = { description?: string };
472+
473+
/**
474+
* A description of a signal handler.
475+
*/
476+
export type SignalHandlerOptions = { description?: string };
477+
478+
/**
479+
* A validator and description of an update handler.
480+
*/
481+
export type UpdateHandlerOptions<Args extends any[]> = { validator?: UpdateValidator<Args>; description?: string };

packages/workflow/src/internals.ts

Lines changed: 68 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,15 @@ import {
1010
TemporalFailure,
1111
Workflow,
1212
WorkflowExecutionAlreadyStartedError,
13-
WorkflowQueryType,
14-
WorkflowSignalType,
15-
WorkflowUpdateType,
13+
WorkflowQueryAnnotatedType,
14+
WorkflowSignalAnnotatedType,
15+
WorkflowUpdateAnnotatedType,
1616
ProtoFailure,
17-
WorkflowUpdateValidatorType,
1817
ApplicationFailure,
1918
} from '@temporalio/common';
2019
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
2120
import { checkExtends } from '@temporalio/common/lib/type-helpers';
22-
import type { coresdk } from '@temporalio/proto';
21+
import type { coresdk, temporal } from '@temporalio/proto';
2322
import { alea, RNG } from './alea';
2423
import { RootCancellationScope } from './cancellation-scope';
2524
import { DeterminismViolationError, LocalActivityDoBackoff, isCancellation } from './errors';
@@ -125,12 +124,12 @@ export class Activator implements ActivationHandler {
125124
/**
126125
* Mapping of update name to handler and validator
127126
*/
128-
readonly updateHandlers = new Map<string, { handler: WorkflowUpdateType; validator?: WorkflowUpdateValidatorType }>();
127+
readonly updateHandlers = new Map<string, WorkflowUpdateAnnotatedType>();
129128

130129
/**
131130
* Mapping of signal name to handler
132131
*/
133-
readonly signalHandlers = new Map<string, WorkflowSignalType>();
132+
readonly signalHandlers = new Map<string, WorkflowSignalAnnotatedType>();
134133

135134
/**
136135
* A signal handler that catches calls for non-registered signal names.
@@ -157,38 +156,74 @@ export class Activator implements ActivationHandler {
157156
/**
158157
* Mapping of query name to handler
159158
*/
160-
public readonly queryHandlers = new Map<string, WorkflowQueryType>([
159+
public readonly queryHandlers = new Map<string, WorkflowQueryAnnotatedType>([
161160
[
162161
'__stack_trace',
163-
() => {
164-
return this.getStackTraces()
165-
.map((s) => s.formatted)
166-
.join('\n\n');
162+
{
163+
handler: () => {
164+
return this.getStackTraces()
165+
.map((s) => s.formatted)
166+
.join('\n\n');
167+
},
168+
description: 'Returns a sensible stack trace.',
167169
},
168170
],
169171
[
170172
'__enhanced_stack_trace',
171-
(): EnhancedStackTrace => {
172-
const { sourceMap } = this;
173-
const sdk: SDKInfo = { name: 'typescript', version: pkg.version };
174-
const stacks = this.getStackTraces().map(({ structured: locations }) => ({ locations }));
175-
const sources: Record<string, FileSlice[]> = {};
176-
if (this.showStackTraceSources) {
177-
for (const { locations } of stacks) {
178-
for (const { filePath } of locations) {
179-
if (!filePath) continue;
180-
const content = sourceMap?.sourcesContent?.[sourceMap?.sources.indexOf(filePath)];
181-
if (!content) continue;
182-
sources[filePath] = [
183-
{
184-
content,
185-
lineOffset: 0,
186-
},
187-
];
173+
{
174+
handler: (): EnhancedStackTrace => {
175+
const { sourceMap } = this;
176+
const sdk: SDKInfo = { name: 'typescript', version: pkg.version };
177+
const stacks = this.getStackTraces().map(({ structured: locations }) => ({ locations }));
178+
const sources: Record<string, FileSlice[]> = {};
179+
if (this.showStackTraceSources) {
180+
for (const { locations } of stacks) {
181+
for (const { filePath } of locations) {
182+
if (!filePath) continue;
183+
const content = sourceMap?.sourcesContent?.[sourceMap?.sources.indexOf(filePath)];
184+
if (!content) continue;
185+
sources[filePath] = [
186+
{
187+
content,
188+
lineOffset: 0,
189+
},
190+
];
191+
}
188192
}
189193
}
190-
}
191-
return { sdk, stacks, sources };
194+
return { sdk, stacks, sources };
195+
},
196+
description: 'Returns a stack trace annotated with source information.',
197+
},
198+
],
199+
[
200+
'__temporal_workflow_metadata',
201+
{
202+
handler: (): temporal.api.sdk.v1.IWorkflowMetadata => {
203+
const workflowType = this.info.workflowType;
204+
const queryDefinitions = Array.from(this.queryHandlers.entries()).map(([name, value]) => ({
205+
name,
206+
description: value.description,
207+
}));
208+
const signalDefinitions = Array.from(this.signalHandlers.entries()).map(([name, value]) => ({
209+
name,
210+
description: value.description,
211+
}));
212+
const updateDefinitions = Array.from(this.updateHandlers.entries()).map(([name, value]) => ({
213+
name,
214+
description: value.description,
215+
}));
216+
return {
217+
definition: {
218+
type: workflowType,
219+
description: null, // For now, do not set the workflow description in the TS SDK.
220+
queryDefinitions,
221+
signalDefinitions,
222+
updateDefinitions,
223+
},
224+
};
225+
},
226+
description: 'Returns metadata associated with this workflow.',
192227
},
193228
],
194229
]);
@@ -491,7 +526,7 @@ export class Activator implements ActivationHandler {
491526

492527
// Intentionally non-async function so this handler doesn't show up in the stack trace
493528
protected queryWorkflowNextHandler({ queryName, args }: QueryInput): Promise<unknown> {
494-
const fn = this.queryHandlers.get(queryName);
529+
const fn = this.queryHandlers.get(queryName)?.handler;
495530
if (fn === undefined) {
496531
const knownQueryTypes = [...this.queryHandlers.keys()].join(' ');
497532
// Fail the query
@@ -662,7 +697,7 @@ export class Activator implements ActivationHandler {
662697
}
663698

664699
public async signalWorkflowNextHandler({ signalName, args }: SignalInput): Promise<void> {
665-
const fn = this.signalHandlers.get(signalName);
700+
const fn = this.signalHandlers.get(signalName)?.handler;
666701
if (fn) {
667702
return await fn(...args);
668703
} else if (this.defaultSignalHandler) {

packages/workflow/src/workflow.ts

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import {
2222
import { versioningIntentToProto } from '@temporalio/common/lib/versioning-intent-enum';
2323
import { Duration, msOptionalToTs, msToNumber, msToTs, tsToMs } from '@temporalio/common/lib/time';
2424
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
25+
import { temporal } from '@temporalio/proto';
2526
import { CancellationScope, registerSleepImplementation } from './cancellation-scope';
2627
import {
2728
ActivityInput,
@@ -39,7 +40,9 @@ import {
3940
DefaultSignalHandler,
4041
EnhancedStackTrace,
4142
Handler,
42-
UpdateValidator,
43+
QueryHandlerOptions,
44+
SignalHandlerOptions,
45+
UpdateHandlerOptions,
4346
WorkflowInfo,
4447
} from './interfaces';
4548
import { LocalActivityDoBackoff } from './errors';
@@ -1143,15 +1146,22 @@ export function defineQuery<Ret, Args extends any[] = [], Name extends string =
11431146
*
11441147
* @param def an {@link UpdateDefinition}, {@link SignalDefinition}, or {@link QueryDefinition} as returned by {@link defineUpdate}, {@link defineSignal}, or {@link defineQuery} respectively.
11451148
* @param handler a compatible handler function for the given definition or `undefined` to unset the handler.
1149+
* @param options an optional `description` of the handler and an optional update `validator` function.
11461150
*/
1147-
export function setHandler<Ret, Args extends any[], T extends SignalDefinition<Args> | QueryDefinition<Ret, Args>>(
1151+
export function setHandler<Ret, Args extends any[], T extends QueryDefinition<Ret, Args>>(
11481152
def: T,
1149-
handler: Handler<Ret, Args, T> | undefined
1153+
handler: Handler<Ret, Args, T> | undefined,
1154+
options?: QueryHandlerOptions
1155+
): void;
1156+
export function setHandler<Ret, Args extends any[], T extends SignalDefinition<Args>>(
1157+
def: T,
1158+
handler: Handler<Ret, Args, T> | undefined,
1159+
options?: SignalHandlerOptions
11501160
): void;
11511161
export function setHandler<Ret, Args extends any[], T extends UpdateDefinition<Ret, Args>>(
11521162
def: T,
11531163
handler: Handler<Ret, Args, T> | undefined,
1154-
options?: { validator: UpdateValidator<Args> }
1164+
options?: UpdateHandlerOptions<Args>
11551165
): void;
11561166

11571167
// For Updates and Signals we want to make a public guarantee something like the
@@ -1240,12 +1250,18 @@ export function setHandler<
12401250
Ret,
12411251
Args extends any[],
12421252
T extends UpdateDefinition<Ret, Args> | SignalDefinition<Args> | QueryDefinition<Ret, Args>,
1243-
>(def: T, handler: Handler<Ret, Args, T> | undefined, options?: { validator: UpdateValidator<Args> }): void {
1253+
>(
1254+
def: T,
1255+
handler: Handler<Ret, Args, T> | undefined,
1256+
options?: QueryHandlerOptions | SignalHandlerOptions | UpdateHandlerOptions<Args>
1257+
): void {
12441258
const activator = assertInWorkflowContext('Workflow.setHandler(...) may only be used from a Workflow Execution.');
1259+
const description = options?.description;
12451260
if (def.type === 'update') {
12461261
if (typeof handler === 'function') {
1247-
const validator = options?.validator as WorkflowUpdateValidatorType | undefined;
1248-
activator.updateHandlers.set(def.name, { handler, validator });
1262+
const updateOptions = options as UpdateHandlerOptions<Args> | undefined;
1263+
const validator = updateOptions?.validator as WorkflowUpdateValidatorType | undefined;
1264+
activator.updateHandlers.set(def.name, { handler, validator, description });
12491265
activator.dispatchBufferedUpdates();
12501266
} else if (handler == null) {
12511267
activator.updateHandlers.delete(def.name);
@@ -1254,7 +1270,7 @@ export function setHandler<
12541270
}
12551271
} else if (def.type === 'signal') {
12561272
if (typeof handler === 'function') {
1257-
activator.signalHandlers.set(def.name, handler as any);
1273+
activator.signalHandlers.set(def.name, { handler: handler as any, description });
12581274
activator.dispatchBufferedSignals();
12591275
} else if (handler == null) {
12601276
activator.signalHandlers.delete(def.name);
@@ -1263,7 +1279,7 @@ export function setHandler<
12631279
}
12641280
} else if (def.type === 'query') {
12651281
if (typeof handler === 'function') {
1266-
activator.queryHandlers.set(def.name, handler as any);
1282+
activator.queryHandlers.set(def.name, { handler: handler as any, description });
12671283
} else if (handler == null) {
12681284
activator.queryHandlers.delete(def.name);
12691285
} else {
@@ -1354,3 +1370,4 @@ export function upsertSearchAttributes(searchAttributes: SearchAttributes): void
13541370

13551371
export const stackTraceQuery = defineQuery<string>('__stack_trace');
13561372
export const enhancedStackTraceQuery = defineQuery<EnhancedStackTrace>('__enhanced_stack_trace');
1373+
export const workflowMetadataQuery = defineQuery<temporal.api.sdk.v1.IWorkflowMetadata>('__temporal_workflow_metadata');

0 commit comments

Comments
 (0)