Skip to content

Commit 13f750a

Browse files
authored
v4: implement onCancel callbacks (#2022)
* v4: implement onCancel callbacks * chat example * Add changeset * A few improvements
1 parent 15816e9 commit 13f750a

File tree

26 files changed

+575
-110
lines changed

26 files changed

+575
-110
lines changed

.changeset/real-rats-drop.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Add onCancel lifecycle hook

apps/webapp/app/components/runs/v3/RunIcon.tsx

+1
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ export function RunIcon({ name, className, spanName }: TaskIconProps) {
9797
case "task-hook-onResume":
9898
case "task-hook-onComplete":
9999
case "task-hook-cleanup":
100+
case "task-hook-onCancel":
100101
return <FunctionIcon className={cn(className, "text-text-dimmed")} />;
101102
case "task-hook-onFailure":
102103
case "task-hook-catchError":

apps/webapp/app/v3/services/cancelTaskRun.server.ts

-23
Original file line numberDiff line numberDiff line change
@@ -47,29 +47,6 @@ export class CancelTaskRunService extends BaseService {
4747
tx: this._prisma,
4848
});
4949

50-
const inProgressEvents = await eventRepository.queryIncompleteEvents(
51-
getTaskEventStoreTableForRun(taskRun),
52-
{
53-
runId: taskRun.friendlyId,
54-
},
55-
taskRun.createdAt,
56-
taskRun.completedAt ?? undefined
57-
);
58-
59-
logger.debug("Cancelling in-progress events", {
60-
inProgressEvents: inProgressEvents.map((event) => event.id),
61-
});
62-
63-
await Promise.all(
64-
inProgressEvents.map((event) => {
65-
return eventRepository.cancelEvent(
66-
event,
67-
options?.cancelledAt ?? new Date(),
68-
options?.reason ?? "Run cancelled"
69-
);
70-
})
71-
);
72-
7350
return {
7451
id: result.run.id,
7552
};

packages/cli-v3/src/entryPoints/dev-run-worker.ts

+32-8
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import {
2323
TaskRunExecution,
2424
timeout,
2525
TriggerConfig,
26+
UsageMeasurement,
2627
waitUntil,
2728
WorkerManifest,
2829
WorkerToExecutorMessageCatalog,
@@ -232,7 +233,10 @@ async function bootstrap() {
232233

233234
let _execution: TaskRunExecution | undefined;
234235
let _isRunning = false;
236+
let _isCancelled = false;
235237
let _tracingSDK: TracingSDK | undefined;
238+
let _executionMeasurement: UsageMeasurement | undefined;
239+
const cancelController = new AbortController();
236240

237241
const zodIpc = new ZodIpcConnection({
238242
listenSchema: WorkerToExecutorMessageCatalog,
@@ -403,18 +407,17 @@ const zodIpc = new ZodIpcConnection({
403407
getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000)
404408
);
405409

406-
const measurement = usage.start();
410+
_executionMeasurement = usage.start();
407411

408-
// This lives outside of the executor because this will eventually be moved to the controller level
409-
const signal = execution.run.maxDuration
410-
? timeout.abortAfterTimeout(execution.run.maxDuration)
411-
: undefined;
412+
const timeoutController = timeout.abortAfterTimeout(execution.run.maxDuration);
413+
414+
const signal = AbortSignal.any([cancelController.signal, timeoutController.signal]);
412415

413416
const { result } = await executor.execute(execution, metadata, traceContext, signal);
414417

415-
const usageSample = usage.stop(measurement);
418+
if (_isRunning && !_isCancelled) {
419+
const usageSample = usage.stop(_executionMeasurement);
416420

417-
if (_isRunning) {
418421
return sender.send("TASK_RUN_COMPLETED", {
419422
execution,
420423
result: {
@@ -458,7 +461,16 @@ const zodIpc = new ZodIpcConnection({
458461
WAIT_COMPLETED_NOTIFICATION: async () => {
459462
await managedWorkerRuntime.completeWaitpoints([]);
460463
},
461-
FLUSH: async ({ timeoutInMs }, sender) => {
464+
CANCEL: async ({ timeoutInMs }) => {
465+
_isCancelled = true;
466+
cancelController.abort("run cancelled");
467+
await callCancelHooks(timeoutInMs);
468+
if (_executionMeasurement) {
469+
usage.stop(_executionMeasurement);
470+
}
471+
await flushAll(timeoutInMs);
472+
},
473+
FLUSH: async ({ timeoutInMs }) => {
462474
await flushAll(timeoutInMs);
463475
},
464476
WAITPOINT_CREATED: async ({ wait, waitpoint }) => {
@@ -470,6 +482,18 @@ const zodIpc = new ZodIpcConnection({
470482
},
471483
});
472484

485+
async function callCancelHooks(timeoutInMs: number = 10_000) {
486+
const now = performance.now();
487+
488+
try {
489+
await Promise.race([lifecycleHooks.callOnCancelHookListeners(), setTimeout(timeoutInMs)]);
490+
} finally {
491+
const duration = performance.now() - now;
492+
493+
log(`Called cancel hooks in ${duration}ms`);
494+
}
495+
}
496+
473497
async function flushAll(timeoutInMs: number = 10_000) {
474498
const now = performance.now();
475499

packages/cli-v3/src/entryPoints/managed-run-worker.ts

+31-7
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import {
2222
TaskRunExecution,
2323
timeout,
2424
TriggerConfig,
25+
UsageMeasurement,
2526
waitUntil,
2627
WorkerManifest,
2728
WorkerToExecutorMessageCatalog,
@@ -229,7 +230,10 @@ async function bootstrap() {
229230

230231
let _execution: TaskRunExecution | undefined;
231232
let _isRunning = false;
233+
let _isCancelled = false;
232234
let _tracingSDK: TracingSDK | undefined;
235+
let _executionMeasurement: UsageMeasurement | undefined;
236+
const cancelController = new AbortController();
233237

234238
const zodIpc = new ZodIpcConnection({
235239
listenSchema: WorkerToExecutorMessageCatalog,
@@ -398,18 +402,17 @@ const zodIpc = new ZodIpcConnection({
398402
getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000)
399403
);
400404

401-
const measurement = usage.start();
405+
_executionMeasurement = usage.start();
402406

403-
// This lives outside of the executor because this will eventually be moved to the controller level
404-
const signal = execution.run.maxDuration
405-
? timeout.abortAfterTimeout(execution.run.maxDuration)
406-
: undefined;
407+
const timeoutController = timeout.abortAfterTimeout(execution.run.maxDuration);
408+
409+
const signal = AbortSignal.any([cancelController.signal, timeoutController.signal]);
407410

408411
const { result } = await executor.execute(execution, metadata, traceContext, signal);
409412

410-
const usageSample = usage.stop(measurement);
413+
if (_isRunning && !_isCancelled) {
414+
const usageSample = usage.stop(_executionMeasurement);
411415

412-
if (_isRunning) {
413416
return sender.send("TASK_RUN_COMPLETED", {
414417
execution,
415418
result: {
@@ -454,6 +457,15 @@ const zodIpc = new ZodIpcConnection({
454457
FLUSH: async ({ timeoutInMs }, sender) => {
455458
await flushAll(timeoutInMs);
456459
},
460+
CANCEL: async ({ timeoutInMs }, sender) => {
461+
_isCancelled = true;
462+
cancelController.abort("run cancelled");
463+
await callCancelHooks(timeoutInMs);
464+
if (_executionMeasurement) {
465+
usage.stop(_executionMeasurement);
466+
}
467+
await flushAll(timeoutInMs);
468+
},
457469
WAITPOINT_CREATED: async ({ wait, waitpoint }) => {
458470
managedWorkerRuntime.associateWaitWithWaitpoint(wait.id, waitpoint.id);
459471
},
@@ -463,6 +475,18 @@ const zodIpc = new ZodIpcConnection({
463475
},
464476
});
465477

478+
async function callCancelHooks(timeoutInMs: number = 10_000) {
479+
const now = performance.now();
480+
481+
try {
482+
await Promise.race([lifecycleHooks.callOnCancelHookListeners(), setTimeout(timeoutInMs)]);
483+
} finally {
484+
const duration = performance.now() - now;
485+
486+
console.log(`Called cancel hooks in ${duration}ms`);
487+
}
488+
}
489+
466490
async function flushAll(timeoutInMs: number = 10_000) {
467491
const now = performance.now();
468492

packages/cli-v3/src/executions/taskRunProcess.ts

+13-2
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,9 @@ export class TaskRunProcess {
109109
this._isBeingCancelled = true;
110110

111111
try {
112-
await this.#flush();
112+
await this.#cancel();
113113
} catch (err) {
114-
console.error("Error flushing task run process", { err });
114+
console.error("Error cancelling task run process", { err });
115115
}
116116

117117
await this.kill();
@@ -120,6 +120,10 @@ export class TaskRunProcess {
120120
async cleanup(kill = true) {
121121
this._isPreparedForNextRun = false;
122122

123+
if (this._isBeingCancelled) {
124+
return;
125+
}
126+
123127
try {
124128
await this.#flush();
125129
} catch (err) {
@@ -224,10 +228,17 @@ export class TaskRunProcess {
224228
await this._ipc?.sendWithAck("FLUSH", { timeoutInMs }, timeoutInMs + 1_000);
225229
}
226230

231+
async #cancel(timeoutInMs: number = 30_000) {
232+
logger.debug("sending cancel message to task run process", { pid: this.pid, timeoutInMs });
233+
234+
await this._ipc?.sendWithAck("CANCEL", { timeoutInMs }, timeoutInMs + 1_000);
235+
}
236+
227237
async execute(
228238
params: TaskRunProcessExecuteParams,
229239
isWarmStart?: boolean
230240
): Promise<TaskRunExecutionResult> {
241+
this._isBeingCancelled = false;
231242
this._isPreparedForNextRun = false;
232243
this._isPreparedForNextAttempt = false;
233244

packages/core/src/utils.ts

+22
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,25 @@ export async function tryCatch<T, E = Error>(
1616
return [error as E, null];
1717
}
1818
}
19+
20+
export type Deferred<T> = {
21+
promise: Promise<T>;
22+
resolve: (value: T) => void;
23+
reject: (reason?: any) => void;
24+
};
25+
26+
export function promiseWithResolvers<T>(): Deferred<T> {
27+
let resolve!: (value: T) => void;
28+
let reject!: (reason?: any) => void;
29+
30+
const promise = new Promise<T>((_resolve, _reject) => {
31+
resolve = _resolve;
32+
reject = _reject;
33+
});
34+
35+
return {
36+
promise,
37+
resolve,
38+
reject,
39+
};
40+
}

packages/core/src/v3/lifecycle-hooks-api.ts

+3
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,7 @@ export type {
3232
AnyOnCleanupHookFunction,
3333
TaskCleanupHookParams,
3434
TaskWait,
35+
TaskCancelHookParams,
36+
OnCancelHookFunction,
37+
AnyOnCancelHookFunction,
3538
} from "./lifecycleHooks/types.js";

packages/core/src/v3/lifecycleHooks/index.ts

+28
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
AnyOnStartHookFunction,
1414
AnyOnSuccessHookFunction,
1515
AnyOnWaitHookFunction,
16+
AnyOnCancelHookFunction,
1617
RegisteredHookFunction,
1718
RegisterHookFunctionParams,
1819
TaskWait,
@@ -260,6 +261,33 @@ export class LifecycleHooksAPI {
260261
this.#getManager().registerOnResumeHookListener(listener);
261262
}
262263

264+
public registerGlobalCancelHook(hook: RegisterHookFunctionParams<AnyOnCancelHookFunction>): void {
265+
this.#getManager().registerGlobalCancelHook(hook);
266+
}
267+
268+
public registerTaskCancelHook(
269+
taskId: string,
270+
hook: RegisterHookFunctionParams<AnyOnCancelHookFunction>
271+
): void {
272+
this.#getManager().registerTaskCancelHook(taskId, hook);
273+
}
274+
275+
public getTaskCancelHook(taskId: string): AnyOnCancelHookFunction | undefined {
276+
return this.#getManager().getTaskCancelHook(taskId);
277+
}
278+
279+
public getGlobalCancelHooks(): RegisteredHookFunction<AnyOnCancelHookFunction>[] {
280+
return this.#getManager().getGlobalCancelHooks();
281+
}
282+
283+
public callOnCancelHookListeners(): Promise<void> {
284+
return this.#getManager().callOnCancelHookListeners();
285+
}
286+
287+
public registerOnCancelHookListener(listener: () => Promise<void>): void {
288+
this.#getManager().registerOnCancelHookListener(listener);
289+
}
290+
263291
#getManager(): LifecycleHooksManager {
264292
return getGlobal(API_NAME) ?? NOOP_LIFECYCLE_HOOKS_MANAGER;
265293
}

0 commit comments

Comments
 (0)