Skip to content

Commit bb65b26

Browse files
authored
OOM retrying on larger machines (#1691)
* OOM retrying on larger machines * Create forty-windows-shop.md * Update forty-windows-shop.md * Only retry again if the machine is different from the original
1 parent 4dd42cd commit bb65b26

File tree

7 files changed

+205
-34
lines changed

7 files changed

+205
-34
lines changed

.changeset/forty-windows-shop.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Added the ability to retry runs that fail with an Out Of Memory (OOM) error on a larger machine.

apps/webapp/app/v3/failedTaskRun.server.ts

Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ import {
55
TaskRunExecutionRetry,
66
TaskRunFailedExecutionResult,
77
} from "@trigger.dev/core/v3";
8+
import type { Prisma, TaskRun } from "@trigger.dev/database";
9+
import * as semver from "semver";
810
import { logger } from "~/services/logger.server";
11+
import { sharedQueueTasks } from "./marqs/sharedQueueConsumer.server";
912
import { BaseService } from "./services/baseService.server";
10-
import { isFailableRunStatus, isFinalAttemptStatus } from "./taskStatus";
11-
import type { Prisma, TaskRun } from "@trigger.dev/database";
1213
import { CompleteAttemptService } from "./services/completeAttempt.server";
1314
import { CreateTaskRunAttemptService } from "./services/createTaskRunAttempt.server";
14-
import { sharedQueueTasks } from "./marqs/sharedQueueConsumer.server";
15-
import * as semver from "semver";
15+
import { isFailableRunStatus, isFinalAttemptStatus } from "./taskStatus";
1616

1717
const FailedTaskRunRetryGetPayload = {
1818
select: {
@@ -180,13 +180,52 @@ export class FailedTaskRunRetryHelper extends BaseService {
180180
}
181181
}
182182

183-
static async getExecutionRetry({
183+
static getExecutionRetry({
184184
run,
185185
execution,
186186
}: {
187187
run: TaskRunWithWorker;
188188
execution: TaskRunExecution;
189-
}): Promise<TaskRunExecutionRetry | undefined> {
189+
}): TaskRunExecutionRetry | undefined {
190+
try {
191+
const retryConfig = FailedTaskRunRetryHelper.getRetryConfig({ run, execution });
192+
if (!retryConfig) {
193+
return;
194+
}
195+
196+
const delay = calculateNextRetryDelay(retryConfig, execution.attempt.number);
197+
198+
if (!delay) {
199+
logger.debug("[FailedTaskRunRetryHelper] No more retries", {
200+
run,
201+
execution,
202+
});
203+
204+
return;
205+
}
206+
207+
return {
208+
timestamp: Date.now() + delay,
209+
delay,
210+
};
211+
} catch (error) {
212+
logger.error("[FailedTaskRunRetryHelper] Failed to get execution retry", {
213+
run,
214+
execution,
215+
error,
216+
});
217+
218+
return;
219+
}
220+
}
221+
222+
static getRetryConfig({
223+
run,
224+
execution,
225+
}: {
226+
run: TaskRunWithWorker;
227+
execution: TaskRunExecution;
228+
}): RetryOptions | undefined {
190229
try {
191230
const retryConfig = run.lockedBy?.retryConfig;
192231

@@ -247,21 +286,7 @@ export class FailedTaskRunRetryHelper extends BaseService {
247286
return;
248287
}
249288

250-
const delay = calculateNextRetryDelay(parsedRetryConfig.data, execution.attempt.number);
251-
252-
if (!delay) {
253-
logger.debug("[FailedTaskRunRetryHelper] No more retries", {
254-
run,
255-
execution,
256-
});
257-
258-
return;
259-
}
260-
261-
return {
262-
timestamp: Date.now() + delay,
263-
delay,
264-
};
289+
return parsedRetryConfig.data;
265290
} catch (error) {
266291
logger.error("[FailedTaskRunRetryHelper] Failed to get execution retry", {
267292
run,

apps/webapp/app/v3/services/completeAttempt.server.ts

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
import { Attributes } from "@opentelemetry/api";
22
import {
33
TaskRunContext,
4+
TaskRunError,
45
TaskRunErrorCodes,
56
TaskRunExecution,
67
TaskRunExecutionResult,
78
TaskRunExecutionRetry,
89
TaskRunFailedExecutionResult,
910
TaskRunSuccessfulExecutionResult,
11+
exceptionEventEnhancer,
1012
flattenAttributes,
13+
internalErrorFromUnexpectedExit,
1114
sanitizeError,
1215
shouldRetryError,
1316
taskRunErrorEnhancer,
@@ -233,7 +236,7 @@ export class CompleteAttemptService extends BaseService {
233236

234237
if (!executionRetry && shouldInfer) {
235238
executionRetryInferred = true;
236-
executionRetry = await FailedTaskRunRetryHelper.getExecutionRetry({
239+
executionRetry = FailedTaskRunRetryHelper.getExecutionRetry({
237240
run: {
238241
...taskRunAttempt.taskRun,
239242
lockedBy: taskRunAttempt.backgroundWorkerTask,
@@ -243,7 +246,47 @@ export class CompleteAttemptService extends BaseService {
243246
});
244247
}
245248

246-
const retriableError = shouldRetryError(taskRunErrorEnhancer(completion.error));
249+
let retriableError = shouldRetryError(taskRunErrorEnhancer(completion.error));
250+
let isOOMRetry = false;
251+
252+
//OOM errors should retry (if an OOM machine is specified)
253+
if (isOOMError(completion.error)) {
254+
const retryConfig = FailedTaskRunRetryHelper.getRetryConfig({
255+
run: {
256+
...taskRunAttempt.taskRun,
257+
lockedBy: taskRunAttempt.backgroundWorkerTask,
258+
lockedToVersion: taskRunAttempt.backgroundWorker,
259+
},
260+
execution,
261+
});
262+
263+
if (
264+
retryConfig?.outOfMemory?.machine &&
265+
retryConfig.outOfMemory.machine !== taskRunAttempt.taskRun.machinePreset
266+
) {
267+
//we will retry
268+
isOOMRetry = true;
269+
retriableError = true;
270+
executionRetry = FailedTaskRunRetryHelper.getExecutionRetry({
271+
run: {
272+
...taskRunAttempt.taskRun,
273+
lockedBy: taskRunAttempt.backgroundWorkerTask,
274+
lockedToVersion: taskRunAttempt.backgroundWorker,
275+
},
276+
execution,
277+
});
278+
279+
//update the machine on the run
280+
await this._prisma.taskRun.update({
281+
where: {
282+
id: taskRunAttempt.taskRunId,
283+
},
284+
data: {
285+
machinePreset: retryConfig.outOfMemory.machine,
286+
},
287+
});
288+
}
289+
}
247290

248291
if (
249292
retriableError &&
@@ -257,6 +300,7 @@ export class CompleteAttemptService extends BaseService {
257300
taskRunAttempt,
258301
environment,
259302
checkpoint,
303+
forceRequeue: isOOMRetry,
260304
});
261305
}
262306

@@ -378,12 +422,14 @@ export class CompleteAttemptService extends BaseService {
378422
executionRetryInferred,
379423
checkpointEventId,
380424
supportsLazyAttempts,
425+
forceRequeue = false,
381426
}: {
382427
run: TaskRun;
383428
executionRetry: TaskRunExecutionRetry;
384429
executionRetryInferred: boolean;
385430
checkpointEventId?: string;
386431
supportsLazyAttempts: boolean;
432+
forceRequeue?: boolean;
387433
}) {
388434
const retryViaQueue = () => {
389435
logger.debug("[CompleteAttemptService] Enqueuing retry attempt", { runId: run.id });
@@ -434,6 +480,12 @@ export class CompleteAttemptService extends BaseService {
434480
return;
435481
}
436482

483+
if (forceRequeue) {
484+
logger.debug("[CompleteAttemptService] Forcing retry via queue", { runId: run.id });
485+
await retryViaQueue();
486+
return;
487+
}
488+
437489
// Workers that never checkpoint between attempts will exit after completing their current attempt if the retry delay exceeds the threshold
438490
if (
439491
!this.opts.supportsRetryCheckpoints &&
@@ -466,13 +518,15 @@ export class CompleteAttemptService extends BaseService {
466518
taskRunAttempt,
467519
environment,
468520
checkpoint,
521+
forceRequeue = false,
469522
}: {
470523
execution: TaskRunExecution;
471524
executionRetry: TaskRunExecutionRetry;
472525
executionRetryInferred: boolean;
473526
taskRunAttempt: NonNullable<FoundAttempt>;
474527
environment: AuthenticatedEnvironment;
475528
checkpoint?: CheckpointData;
529+
forceRequeue?: boolean;
476530
}) {
477531
const retryAt = new Date(executionRetry.timestamp);
478532

@@ -533,6 +587,7 @@ export class CompleteAttemptService extends BaseService {
533587
executionRetry,
534588
supportsLazyAttempts: taskRunAttempt.backgroundWorker.supportsLazyAttempts,
535589
executionRetryInferred,
590+
forceRequeue,
536591
});
537592

538593
return "RETRIED";
@@ -634,3 +689,12 @@ async function findAttempt(prismaClient: PrismaClientOrTransaction, friendlyId:
634689
},
635690
});
636691
}
692+
693+
function isOOMError(error: TaskRunError) {
694+
if (error.type !== "INTERNAL_ERROR") return false;
695+
if (error.code === "TASK_PROCESS_OOM_KILLED" || error.code === "TASK_PROCESS_MAYBE_OOM_KILLED") {
696+
return true;
697+
}
698+
699+
return false;
700+
}

docs/machines.mdx

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@ The `machine` configuration is optional. Using higher spec machines will increas
88
```ts /trigger/heavy-task.ts
99
export const heavyTask = task({
1010
id: "heavy-task",
11-
machine: {
12-
preset: "large-1x",
13-
},
11+
machine: "large-1x",
1412
run: async ({ payload, ctx }) => {
1513
//...
1614
},
@@ -28,6 +26,37 @@ export const config: TriggerConfig = {
2826
};
2927
```
3028

29+
## Out Of Memory errors
30+
31+
Sometimes you might see one of your runs fail with an "Out Of Memory" error.
32+
33+
> TASK_PROCESS_OOM_KILLED. Your task ran out of memory. Try increasing the machine specs. If this doesn't fix it there might be a memory leak.
34+
35+
If this happens regularly you need to either optimize the memory-efficiency of your code, or increase the machine.
36+
37+
### Retrying with a larger machine
38+
39+
If you are seeing rare OOM errors, you can add a setting to your task to retry with a large machine if you get an OOM error:
40+
41+
```ts /trigger/heavy-task.ts
42+
export const yourTask = task({
43+
id: "your-task",
44+
machine: "medium-1x",
45+
retry: {
46+
outOfMemory: {
47+
machine: "large-1x",
48+
},
49+
},
50+
run: async (payload: any, { ctx }) => {
51+
//...
52+
},
53+
});
54+
```
55+
56+
<Note>
57+
This will only retry the task if you get an OOM error. It won't permanently change the machine that a new run starts on, so if you consistently see OOM errors you should change the machine in the `machine` property.
58+
</Note>
59+
3160
## Machine configurations
3261

3362
| Preset | vCPU | Memory | Disk space |

packages/core/src/v3/schemas/schemas.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { z } from "zod";
22
import { RequireKeys } from "../types/index.js";
3-
import { MachineConfig, MachinePreset, TaskRunExecution } from "./common.js";
3+
import { MachineConfig, MachinePreset, MachinePresetName, TaskRunExecution } from "./common.js";
44

55
/*
66
WARNING: Never import anything from ./messages here. If it's needed in both, put it here instead.
@@ -95,15 +95,25 @@ export const RetryOptions = z.object({
9595
* This can be useful to prevent the thundering herd problem where all retries happen at the same time.
9696
*/
9797
randomize: z.boolean().optional(),
98+
99+
/** If a run fails with an Out Of Memory (OOM) error and you have this set, it will retry with the machine you specify.
100+
* Note: it will not default to this [machine](https://trigger.dev/docs/machines) for new runs, only for failures caused by OOM errors.
101+
* So if you frequently have attempts failing with OOM errors, you should set the [default machine](https://trigger.dev/docs/machines) to be higher.
102+
*/
103+
outOfMemory: z
104+
.object({
105+
machine: MachinePresetName.optional(),
106+
})
107+
.optional(),
98108
});
99109

100110
export type RetryOptions = z.infer<typeof RetryOptions>;
101111

102112
export const QueueOptions = z.object({
103113
/** You can define a shared queue and then pass the name in to your task.
104-
*
114+
*
105115
* @example
106-
*
116+
*
107117
* ```ts
108118
* const myQueue = queue({
109119
name: "my-queue",

packages/core/src/v3/types/tasks.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -202,17 +202,14 @@ type CommonTaskOptions<
202202
* ```
203203
*/
204204
queue?: QueueOptions;
205-
/** Configure the spec of the machine you want your task to run on.
205+
/** Configure the spec of the [machine](https://trigger.dev/docs/machines) you want your task to run on.
206206
*
207207
* @example
208208
*
209209
* ```ts
210210
* export const heavyTask = task({
211211
id: "heavy-task",
212-
machine: {
213-
cpu: 2,
214-
memory: 4,
215-
},
212+
machine: "medium-1x",
216213
run: async ({ payload, ctx }) => {
217214
//...
218215
},
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import { logger, task } from "@trigger.dev/sdk/v3";
2+
import { setTimeout } from "timers/promises";
3+
4+
export const oomTask = task({
5+
id: "oom-task",
6+
machine: "micro",
7+
retry: {
8+
outOfMemory: {
9+
machine: "small-1x",
10+
},
11+
},
12+
run: async ({ succeedOnLargerMachine }: { succeedOnLargerMachine: boolean }, { ctx }) => {
13+
logger.info("running out of memory below this line");
14+
15+
logger.info(`Running on ${ctx.machine?.name}`);
16+
17+
await setTimeout(2000);
18+
19+
if (ctx.machine?.name !== "micro" && succeedOnLargerMachine) {
20+
logger.info("Going to succeed now");
21+
return {
22+
success: true,
23+
};
24+
}
25+
26+
let a = "a";
27+
28+
try {
29+
while (true) {
30+
a += a;
31+
}
32+
} catch (error) {
33+
logger.error(error instanceof Error ? error.message : "Unknown error", { error });
34+
35+
let b = [];
36+
while (true) {
37+
b.push(a.replace(/a/g, "b"));
38+
}
39+
}
40+
},
41+
});

0 commit comments

Comments
 (0)