Skip to content

Commit 268c9f6

Browse files
committed
implement onWait/onResume
1 parent 3ca5985 commit 268c9f6

File tree

8 files changed

+401
-5
lines changed

8 files changed

+401
-5
lines changed

packages/core/src/v3/lifecycle-hooks-api.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,5 @@ export type {
3131
OnCleanupHookFunction,
3232
AnyOnCleanupHookFunction,
3333
TaskCleanupHookParams,
34+
TaskWait,
3435
} from "./lifecycleHooks/types.js";

packages/core/src/v3/lifecycleHooks/index.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
AnyOnWaitHookFunction,
1616
RegisteredHookFunction,
1717
RegisterHookFunctionParams,
18+
TaskWait,
1819
type LifecycleHooksManager,
1920
} from "./types.js";
2021

@@ -243,6 +244,22 @@ export class LifecycleHooksAPI {
243244
return this.#getManager().getGlobalCleanupHooks();
244245
}
245246

247+
public callOnWaitHookListeners(wait: TaskWait): Promise<void> {
248+
return this.#getManager().callOnWaitHookListeners(wait);
249+
}
250+
251+
public callOnResumeHookListeners(wait: TaskWait): Promise<void> {
252+
return this.#getManager().callOnResumeHookListeners(wait);
253+
}
254+
255+
public registerOnWaitHookListener(listener: (wait: TaskWait) => Promise<void>): void {
256+
this.#getManager().registerOnWaitHookListener(listener);
257+
}
258+
259+
public registerOnResumeHookListener(listener: (wait: TaskWait) => Promise<void>): void {
260+
this.#getManager().registerOnResumeHookListener(listener);
261+
}
262+
246263
#getManager(): LifecycleHooksManager {
247264
return getGlobal(API_NAME) ?? NOOP_LIFECYCLE_HOOKS_MANAGER;
248265
}

packages/core/src/v3/lifecycleHooks/manager.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
AnyOnCatchErrorHookFunction,
1313
AnyOnMiddlewareHookFunction,
1414
AnyOnCleanupHookFunction,
15+
TaskWait,
1516
} from "./types.js";
1617

1718
export class StandardLifecycleHooksManager implements LifecycleHooksManager {
@@ -58,6 +59,25 @@ export class StandardLifecycleHooksManager implements LifecycleHooksManager {
5859
private taskCleanupHooks: Map<string, RegisteredHookFunction<AnyOnCleanupHookFunction>> =
5960
new Map();
6061

62+
private onWaitHookListeners: ((wait: TaskWait) => Promise<void>)[] = [];
63+
private onResumeHookListeners: ((wait: TaskWait) => Promise<void>)[] = [];
64+
65+
registerOnWaitHookListener(listener: (wait: TaskWait) => Promise<void>): void {
66+
this.onWaitHookListeners.push(listener);
67+
}
68+
69+
async callOnWaitHookListeners(wait: TaskWait): Promise<void> {
70+
await Promise.all(this.onWaitHookListeners.map((listener) => listener(wait)));
71+
}
72+
73+
registerOnResumeHookListener(listener: (wait: TaskWait) => Promise<void>): void {
74+
this.onResumeHookListeners.push(listener);
75+
}
76+
77+
async callOnResumeHookListeners(wait: TaskWait): Promise<void> {
78+
await Promise.all(this.onResumeHookListeners.map((listener) => listener(wait)));
79+
}
80+
6181
registerGlobalStartHook(hook: RegisterHookFunctionParams<AnyOnStartHookFunction>): void {
6282
const id = generateHookId(hook);
6383

@@ -377,6 +397,22 @@ export class StandardLifecycleHooksManager implements LifecycleHooksManager {
377397
}
378398

379399
export class NoopLifecycleHooksManager implements LifecycleHooksManager {
400+
registerOnWaitHookListener(listener: (wait: TaskWait) => Promise<void>): void {
401+
// Noop
402+
}
403+
404+
async callOnWaitHookListeners(wait: TaskWait): Promise<void> {
405+
// Noop
406+
}
407+
408+
registerOnResumeHookListener(listener: (wait: TaskWait) => Promise<void>): void {
409+
// Noop
410+
}
411+
412+
async callOnResumeHookListeners(wait: TaskWait): Promise<void> {
413+
// Noop
414+
}
415+
380416
registerGlobalInitHook(hook: RegisterHookFunctionParams<AnyOnInitHookFunction>): void {
381417
// Noop
382418
}

packages/core/src/v3/lifecycleHooks/types.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,30 @@ export type OnStartHookFunction<TPayload, TInitOutput extends TaskInitOutput = T
3333

3434
export type AnyOnStartHookFunction = OnStartHookFunction<unknown, TaskInitOutput>;
3535

36+
export type TaskWait =
37+
| {
38+
type: "duration";
39+
date: Date;
40+
}
41+
| {
42+
type: "token";
43+
token: string;
44+
}
45+
| {
46+
type: "task";
47+
runId: string;
48+
}
49+
| {
50+
type: "batch";
51+
batchId: string;
52+
runCount: number;
53+
};
54+
3655
export type TaskWaitHookParams<
3756
TPayload = unknown,
3857
TInitOutput extends TaskInitOutput = TaskInitOutput,
3958
> = {
59+
wait: TaskWait;
4060
ctx: TaskRunContext;
4161
payload: TPayload;
4262
task: string;
@@ -55,6 +75,7 @@ export type TaskResumeHookParams<
5575
TInitOutput extends TaskInitOutput = TaskInitOutput,
5676
> = {
5777
ctx: TaskRunContext;
78+
wait: TaskWait;
5879
payload: TPayload;
5980
task: string;
6081
signal?: AbortSignal;
@@ -280,4 +301,10 @@ export interface LifecycleHooksManager {
280301
): void;
281302
getTaskCleanupHook(taskId: string): AnyOnCleanupHookFunction | undefined;
282303
getGlobalCleanupHooks(): RegisteredHookFunction<AnyOnCleanupHookFunction>[];
304+
305+
callOnWaitHookListeners(wait: TaskWait): Promise<void>;
306+
registerOnWaitHookListener(listener: (wait: TaskWait) => Promise<void>): void;
307+
308+
callOnResumeHookListeners(wait: TaskWait): Promise<void>;
309+
registerOnResumeHookListener(listener: (wait: TaskWait) => Promise<void>): void;
283310
}

packages/core/src/v3/runtime/managedRuntimeManager.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { lifecycleHooks } from "../lifecycle-hooks-api.js";
12
import {
23
BatchTaskRunExecutionResult,
34
CompletedWaitpoint,
@@ -44,9 +45,19 @@ export class ManagedRuntimeManager implements RuntimeManager {
4445
this.resolversByWaitId.set(params.id, resolve);
4546
});
4647

48+
await lifecycleHooks.callOnWaitHookListeners({
49+
type: "task",
50+
runId: params.id,
51+
});
52+
4753
const waitpoint = await promise;
4854
const result = this.waitpointToTaskRunExecutionResult(waitpoint);
4955

56+
await lifecycleHooks.callOnResumeHookListeners({
57+
type: "task",
58+
runId: params.id,
59+
});
60+
5061
return result;
5162
});
5263
}
@@ -70,8 +81,20 @@ export class ManagedRuntimeManager implements RuntimeManager {
7081
})
7182
);
7283

84+
await lifecycleHooks.callOnWaitHookListeners({
85+
type: "batch",
86+
batchId: params.id,
87+
runCount: params.runCount,
88+
});
89+
7390
const waitpoints = await promise;
7491

92+
await lifecycleHooks.callOnResumeHookListeners({
93+
type: "batch",
94+
batchId: params.id,
95+
runCount: params.runCount,
96+
});
97+
7598
return {
7699
id: params.id,
77100
items: waitpoints.map(this.waitpointToTaskRunExecutionResult),
@@ -91,8 +114,32 @@ export class ManagedRuntimeManager implements RuntimeManager {
91114
this.resolversByWaitId.set(waitpointFriendlyId, resolve);
92115
});
93116

117+
if (finishDate) {
118+
await lifecycleHooks.callOnWaitHookListeners({
119+
type: "duration",
120+
date: finishDate,
121+
});
122+
} else {
123+
await lifecycleHooks.callOnWaitHookListeners({
124+
type: "token",
125+
token: waitpointFriendlyId,
126+
});
127+
}
128+
94129
const waitpoint = await promise;
95130

131+
if (finishDate) {
132+
await lifecycleHooks.callOnResumeHookListeners({
133+
type: "duration",
134+
date: finishDate,
135+
});
136+
} else {
137+
await lifecycleHooks.callOnResumeHookListeners({
138+
type: "token",
139+
token: waitpointFriendlyId,
140+
});
141+
}
142+
96143
return {
97144
ok: !waitpoint.outputIsError,
98145
output: waitpoint.output,

packages/core/src/v3/workers/taskExecutor.ts

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
RegisteredHookFunction,
1616
TaskCompleteResult,
1717
TaskInitOutput,
18+
TaskWait,
1819
} from "../lifecycleHooks/types.js";
1920
import { recordSpanException, TracingSDK } from "../otel/index.js";
2021
import { runTimelineMetrics } from "../run-timeline-metrics-api.js";
@@ -126,6 +127,14 @@ export class TaskExecutor {
126127

127128
parsedPayload = await this.#parsePayload(payloadResult);
128129

130+
lifecycleHooks.registerOnWaitHookListener(async (wait) => {
131+
await this.#callOnWaitFunctions(wait, parsedPayload, ctx, initOutput, signal);
132+
});
133+
134+
lifecycleHooks.registerOnResumeHookListener(async (wait) => {
135+
await this.#callOnResumeFunctions(wait, parsedPayload, ctx, initOutput, signal);
136+
});
137+
129138
const executeTask = async (payload: any) => {
130139
const [runError, output] = await tryCatch(
131140
(async () => {
@@ -383,6 +392,146 @@ export class TaskExecutor {
383392
});
384393
}
385394

395+
async #callOnWaitFunctions(
396+
wait: TaskWait,
397+
payload: unknown,
398+
ctx: TaskRunContext,
399+
initOutput: TaskInitOutput,
400+
signal?: AbortSignal
401+
) {
402+
const globalWaitHooks = lifecycleHooks.getGlobalWaitHooks();
403+
const taskWaitHook = lifecycleHooks.getTaskWaitHook(this.task.id);
404+
405+
if (globalWaitHooks.length === 0 && !taskWaitHook) {
406+
return;
407+
}
408+
409+
const result = await runTimelineMetrics.measureMetric(
410+
"trigger.dev/execution",
411+
"onWait",
412+
async () => {
413+
for (const hook of globalWaitHooks) {
414+
const [hookError] = await tryCatch(
415+
this._tracer.startActiveSpan(
416+
hook.name ? `onWait/${hook.name}` : "onWait/global",
417+
async (span) => {
418+
await hook.fn({ payload, ctx, signal, task: this.task.id, wait, init: initOutput });
419+
},
420+
{
421+
attributes: {
422+
[SemanticInternalAttributes.STYLE_ICON]: "task-hook-onWait",
423+
[SemanticInternalAttributes.COLLAPSED]: true,
424+
},
425+
}
426+
)
427+
);
428+
429+
if (hookError) {
430+
throw hookError;
431+
}
432+
}
433+
434+
if (taskWaitHook) {
435+
const [hookError] = await tryCatch(
436+
this._tracer.startActiveSpan(
437+
"onWait/task",
438+
async (span) => {
439+
await taskWaitHook({
440+
payload,
441+
ctx,
442+
signal,
443+
task: this.task.id,
444+
wait,
445+
init: initOutput,
446+
});
447+
},
448+
{
449+
attributes: {
450+
[SemanticInternalAttributes.STYLE_ICON]: "task-hook-onWait",
451+
[SemanticInternalAttributes.COLLAPSED]: true,
452+
},
453+
}
454+
)
455+
);
456+
457+
if (hookError) {
458+
throw hookError;
459+
}
460+
}
461+
}
462+
);
463+
}
464+
465+
async #callOnResumeFunctions(
466+
wait: TaskWait,
467+
payload: unknown,
468+
ctx: TaskRunContext,
469+
initOutput: TaskInitOutput,
470+
signal?: AbortSignal
471+
) {
472+
const globalResumeHooks = lifecycleHooks.getGlobalResumeHooks();
473+
const taskResumeHook = lifecycleHooks.getTaskResumeHook(this.task.id);
474+
475+
if (globalResumeHooks.length === 0 && !taskResumeHook) {
476+
return;
477+
}
478+
479+
const result = await runTimelineMetrics.measureMetric(
480+
"trigger.dev/execution",
481+
"onResume",
482+
async () => {
483+
for (const hook of globalResumeHooks) {
484+
const [hookError] = await tryCatch(
485+
this._tracer.startActiveSpan(
486+
hook.name ? `onResume/${hook.name}` : "onResume/global",
487+
async (span) => {
488+
await hook.fn({ payload, ctx, signal, task: this.task.id, wait, init: initOutput });
489+
},
490+
{
491+
attributes: {
492+
[SemanticInternalAttributes.STYLE_ICON]: "task-hook-onResume",
493+
[SemanticInternalAttributes.COLLAPSED]: true,
494+
},
495+
}
496+
)
497+
);
498+
499+
if (hookError) {
500+
throw hookError;
501+
}
502+
}
503+
504+
if (taskResumeHook) {
505+
const [hookError] = await tryCatch(
506+
this._tracer.startActiveSpan(
507+
"onResume/task",
508+
async (span) => {
509+
await taskResumeHook({
510+
payload,
511+
ctx,
512+
signal,
513+
task: this.task.id,
514+
wait,
515+
init: initOutput,
516+
});
517+
},
518+
{
519+
attributes: {
520+
[SemanticInternalAttributes.STYLE_ICON]: "task-hook-onResume",
521+
[SemanticInternalAttributes.COLLAPSED]: true,
522+
},
523+
}
524+
)
525+
);
526+
527+
if (hookError) {
528+
throw hookError;
529+
}
530+
}
531+
}
532+
);
533+
}
534+
386535
async #callInitFunctions(payload: unknown, ctx: TaskRunContext, signal?: AbortSignal) {
387536
const globalInitHooks = lifecycleHooks.getGlobalInitHooks();
388537
const taskInitHook = lifecycleHooks.getTaskInitHook(this.task.id);

0 commit comments

Comments
 (0)