Skip to content

Commit 30705fd

Browse files
committed
implement cleanup
1 parent d0a5c16 commit 30705fd

File tree

2 files changed

+299
-8
lines changed

2 files changed

+299
-8
lines changed

packages/core/src/v3/workers/taskExecutor.ts

Lines changed: 84 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ export class TaskExecutor {
174174
);
175175
}
176176

177+
await this.#cleanupAndWaitUntil(payload, ctx, initOutput, signal);
178+
177179
return {
178180
id: execution.run.id,
179181
ok: false,
@@ -191,6 +193,8 @@ export class TaskExecutor {
191193

192194
if (outputError) {
193195
recordSpanException(span, outputError);
196+
await this.#cleanupAndWaitUntil(payload, ctx, initOutput, signal);
197+
194198
return this.#internalErrorResult(
195199
execution,
196200
TaskRunErrorCodes.TASK_OUTPUT_ERROR,
@@ -208,6 +212,8 @@ export class TaskExecutor {
208212

209213
if (exportError) {
210214
recordSpanException(span, exportError);
215+
await this.#cleanupAndWaitUntil(payload, ctx, initOutput, signal);
216+
211217
return this.#internalErrorResult(
212218
execution,
213219
TaskRunErrorCodes.TASK_OUTPUT_ERROR,
@@ -236,6 +242,8 @@ export class TaskExecutor {
236242
signal
237243
);
238244

245+
await this.#cleanupAndWaitUntil(payload, ctx, initOutput, signal);
246+
239247
return {
240248
ok: true,
241249
id: execution.run.id,
@@ -690,21 +698,90 @@ export class TaskExecutor {
690698
);
691699
}
692700

693-
async #callTaskCleanup(
701+
async #cleanupAndWaitUntil(
694702
payload: unknown,
695703
ctx: TaskRunContext,
696-
init: unknown,
704+
initOutput: any,
697705
signal?: AbortSignal
698706
) {
699-
const cleanupFn = this.task.fns.cleanup;
707+
await this.#callCleanupFunctions(payload, ctx, initOutput, signal);
708+
await this.#blockForWaitUntil();
709+
}
700710

701-
if (!cleanupFn) {
711+
async #callCleanupFunctions(
712+
payload: unknown,
713+
ctx: TaskRunContext,
714+
initOutput: any,
715+
signal?: AbortSignal
716+
) {
717+
const globalCleanupHooks = lifecycleHooks.getGlobalCleanupHooks();
718+
const taskCleanupHook = lifecycleHooks.getTaskCleanupHook(this.task.id);
719+
720+
if (globalCleanupHooks.length === 0 && !taskCleanupHook) {
702721
return;
703722
}
704723

705-
return this._tracer.startActiveSpan("cleanup", async (span) => {
706-
return await cleanupFn(payload, { ctx, init, signal });
707-
});
724+
return this._tracer.startActiveSpan(
725+
"hooks.cleanup",
726+
async (span) => {
727+
return await runTimelineMetrics.measureMetric(
728+
"trigger.dev/execution",
729+
"cleanup",
730+
async () => {
731+
for (const hook of globalCleanupHooks) {
732+
const [hookError] = await tryCatch(
733+
this._tracer.startActiveSpan(
734+
hook.name ?? "global",
735+
async (span) => {
736+
await hook.fn({
737+
payload,
738+
ctx,
739+
signal,
740+
task: this.task.id,
741+
init: initOutput,
742+
});
743+
},
744+
{
745+
attributes: {
746+
[SemanticInternalAttributes.STYLE_ICON]: "tabler-function",
747+
},
748+
}
749+
)
750+
);
751+
// Ignore errors from cleanup functions
752+
}
753+
754+
if (taskCleanupHook) {
755+
const [hookError] = await tryCatch(
756+
this._tracer.startActiveSpan(
757+
"task",
758+
async (span) => {
759+
await taskCleanupHook({
760+
payload,
761+
ctx,
762+
signal,
763+
task: this.task.id,
764+
init: initOutput,
765+
});
766+
},
767+
{
768+
attributes: {
769+
[SemanticInternalAttributes.STYLE_ICON]: "tabler-function",
770+
},
771+
}
772+
)
773+
);
774+
// Ignore errors from cleanup functions
775+
}
776+
}
777+
);
778+
},
779+
{
780+
attributes: {
781+
[SemanticInternalAttributes.STYLE_ICON]: "tabler-function",
782+
},
783+
}
784+
);
708785
}
709786

710787
async #blockForWaitUntil() {

packages/core/test/taskExecutor.test.ts

Lines changed: 215 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -779,7 +779,8 @@ describe("TaskExecutor", () => {
779779
},
780780
});
781781

782-
expect((result as any).result.retry.delay).toBeCloseTo(30000, -1);
782+
expect((result as any).result.retry.delay).toBeGreaterThan(29900);
783+
expect((result as any).result.retry.delay).toBeLessThan(30100);
783784
});
784785

785786
test("should execute middleware hooks in correct order around other hooks", async () => {
@@ -1203,6 +1204,219 @@ describe("TaskExecutor", () => {
12031204
},
12041205
});
12051206
});
1207+
1208+
test("should call cleanup hooks in correct order after other hooks but before middleware completion", async () => {
1209+
const executionOrder: string[] = [];
1210+
1211+
// Register global init hook
1212+
lifecycleHooks.registerGlobalInitHook({
1213+
id: "test-init",
1214+
fn: async () => {
1215+
executionOrder.push("init");
1216+
return {
1217+
foo: "bar",
1218+
};
1219+
},
1220+
});
1221+
1222+
// Register global start hook
1223+
lifecycleHooks.registerGlobalStartHook({
1224+
id: "global-start",
1225+
fn: async () => {
1226+
executionOrder.push("start");
1227+
},
1228+
});
1229+
1230+
// Register global success hook
1231+
lifecycleHooks.registerGlobalSuccessHook({
1232+
id: "global-success",
1233+
fn: async () => {
1234+
executionOrder.push("success");
1235+
},
1236+
});
1237+
1238+
// Register global complete hook
1239+
lifecycleHooks.registerGlobalCompleteHook({
1240+
id: "global-complete",
1241+
fn: async () => {
1242+
executionOrder.push("complete");
1243+
},
1244+
});
1245+
1246+
// Register global cleanup hooks
1247+
lifecycleHooks.registerGlobalCleanupHook({
1248+
id: "global-cleanup-1",
1249+
fn: async ({ init }) => {
1250+
executionOrder.push("global-cleanup-1");
1251+
// Verify we have access to init data
1252+
expect(init).toEqual({ foo: "bar" });
1253+
},
1254+
});
1255+
1256+
lifecycleHooks.registerGlobalCleanupHook({
1257+
id: "global-cleanup-2",
1258+
fn: async ({ init }) => {
1259+
executionOrder.push("global-cleanup-2");
1260+
// Verify we have access to init data
1261+
expect(init).toEqual({ foo: "bar" });
1262+
},
1263+
});
1264+
1265+
// Register task-specific cleanup hook
1266+
lifecycleHooks.registerTaskCleanupHook("test-task", {
1267+
id: "task-cleanup",
1268+
fn: async ({ init }) => {
1269+
executionOrder.push("task-cleanup");
1270+
// Verify we have access to init data
1271+
expect(init).toEqual({ foo: "bar" });
1272+
},
1273+
});
1274+
1275+
// Register middleware to verify cleanup happens before middleware completion
1276+
lifecycleHooks.registerGlobalMiddlewareHook({
1277+
id: "global-middleware",
1278+
fn: async ({ next }) => {
1279+
executionOrder.push("middleware-before");
1280+
await next();
1281+
executionOrder.push("middleware-after");
1282+
},
1283+
});
1284+
1285+
const task = {
1286+
id: "test-task",
1287+
fns: {
1288+
run: async (payload: any, params: RunFnParams<any>) => {
1289+
executionOrder.push("run");
1290+
return {
1291+
output: "test-output",
1292+
init: params.init,
1293+
};
1294+
},
1295+
},
1296+
};
1297+
1298+
const result = await executeTask(task, { test: "data" });
1299+
1300+
// Verify the execution order:
1301+
// 1. Middleware starts
1302+
// 2. Init hook
1303+
// 3. Start hook
1304+
// 4. Run function
1305+
// 5. Success hook
1306+
// 6. Complete hook
1307+
// 7. Cleanup hooks
1308+
// 8. Middleware completes
1309+
expect(executionOrder).toEqual([
1310+
"middleware-before",
1311+
"init",
1312+
"start",
1313+
"run",
1314+
"success",
1315+
"complete",
1316+
"global-cleanup-1",
1317+
"global-cleanup-2",
1318+
"task-cleanup",
1319+
"middleware-after",
1320+
]);
1321+
1322+
// Verify the final result
1323+
expect(result).toEqual({
1324+
result: {
1325+
ok: true,
1326+
id: "test-run-id",
1327+
output: '{"json":{"output":"test-output","init":{"foo":"bar"}}}',
1328+
outputType: "application/super+json",
1329+
},
1330+
});
1331+
});
1332+
1333+
test("should call cleanup hooks even when task fails", async () => {
1334+
const executionOrder: string[] = [];
1335+
const expectedError = new Error("Task failed intentionally");
1336+
1337+
// Register global init hook
1338+
lifecycleHooks.registerGlobalInitHook({
1339+
id: "test-init",
1340+
fn: async () => {
1341+
executionOrder.push("init");
1342+
return {
1343+
foo: "bar",
1344+
};
1345+
},
1346+
});
1347+
1348+
// Register failure hook
1349+
lifecycleHooks.registerGlobalFailureHook({
1350+
id: "global-failure",
1351+
fn: async () => {
1352+
executionOrder.push("failure");
1353+
},
1354+
});
1355+
1356+
// Register complete hook
1357+
lifecycleHooks.registerGlobalCompleteHook({
1358+
id: "global-complete",
1359+
fn: async () => {
1360+
executionOrder.push("complete");
1361+
},
1362+
});
1363+
1364+
// Register cleanup hooks
1365+
lifecycleHooks.registerGlobalCleanupHook({
1366+
id: "global-cleanup",
1367+
fn: async ({ init }) => {
1368+
executionOrder.push("global-cleanup");
1369+
// Verify we have access to init data even after failure
1370+
expect(init).toEqual({ foo: "bar" });
1371+
},
1372+
});
1373+
1374+
lifecycleHooks.registerTaskCleanupHook("test-task", {
1375+
id: "task-cleanup",
1376+
fn: async ({ init }) => {
1377+
executionOrder.push("task-cleanup");
1378+
// Verify we have access to init data even after failure
1379+
expect(init).toEqual({ foo: "bar" });
1380+
},
1381+
});
1382+
1383+
const task = {
1384+
id: "test-task",
1385+
fns: {
1386+
run: async () => {
1387+
executionOrder.push("run");
1388+
throw expectedError;
1389+
},
1390+
},
1391+
};
1392+
1393+
const result = await executeTask(task, { test: "data" });
1394+
1395+
// Verify cleanup hooks are called even after failure
1396+
expect(executionOrder).toEqual([
1397+
"init",
1398+
"run",
1399+
"failure",
1400+
"complete",
1401+
"global-cleanup",
1402+
"task-cleanup",
1403+
]);
1404+
1405+
// Verify the error result
1406+
expect(result).toEqual({
1407+
result: {
1408+
ok: false,
1409+
id: "test-run-id",
1410+
error: {
1411+
type: "BUILT_IN_ERROR",
1412+
message: "Task failed intentionally",
1413+
name: "Error",
1414+
stackTrace: expect.any(String),
1415+
},
1416+
skippedRetrying: false,
1417+
},
1418+
});
1419+
});
12061420
});
12071421

12081422
function executeTask(task: TaskMetadataWithFunctions, payload: any) {

0 commit comments

Comments
 (0)