Skip to content

Commit 4efe759

Browse files
committed
add backoff retries to messages with acks
1 parent 5825272 commit 4efe759

File tree

1 file changed

+179
-57
lines changed

1 file changed

+179
-57
lines changed

apps/coordinator/src/index.ts

Lines changed: 179 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
} from "@trigger.dev/core/v3";
1212
import { ZodNamespace } from "@trigger.dev/core/v3/zodNamespace";
1313
import { ZodSocketConnection } from "@trigger.dev/core/v3/zodSocket";
14-
import { HttpReply, getTextBody } from "@trigger.dev/core/v3/apps";
14+
import { ExponentialBackoff, HttpReply, getTextBody } from "@trigger.dev/core/v3/apps";
1515
import { ChaosMonkey } from "./chaosMonkey";
1616
import { Checkpointer } from "./checkpointer";
1717
import { boolFromEnv, numFromEnv } from "./util";
@@ -37,6 +37,22 @@ const chaosMonkey = new ChaosMonkey(
3737
!!process.env.CHAOS_MONKEY_DISABLE_DELAYS
3838
);
3939

40+
const backoff = new ExponentialBackoff("FullJitter", {
41+
maxRetries: 7,
42+
});
43+
44+
function serializeError(error: unknown) {
45+
if (error instanceof Error) {
46+
return {
47+
name: error.name,
48+
message: error.message,
49+
stack: error.stack,
50+
};
51+
}
52+
53+
return error;
54+
}
55+
4056
class CheckpointReadinessTimeoutError extends Error {}
4157
class CheckpointCancelError extends Error {}
4258

@@ -612,12 +628,36 @@ class TaskCoordinator {
612628
log.log("Handling READY_FOR_LAZY_ATTEMPT");
613629

614630
try {
615-
const lazyAttempt = await this.#platformSocket?.sendWithAck("READY_FOR_LAZY_ATTEMPT", {
616-
...message,
617-
envId: socket.data.envId,
618-
});
631+
const lazyAttempt = await backoff
632+
.max(2) // The run controller expects a reply in 10s so we can't have long retry delays
633+
.maxRetries(5)
634+
.execute(async () => {
635+
return await this.#platformSocket?.sendWithAck("READY_FOR_LAZY_ATTEMPT", {
636+
...message,
637+
envId: socket.data.envId,
638+
});
639+
});
640+
641+
if (!lazyAttempt.success) {
642+
log.error("Failed to send READY_FOR_LAZY_ATTEMPT", {
643+
error: serializeError(lazyAttempt.error),
644+
cause: lazyAttempt.cause,
645+
});
646+
647+
await crashRun({
648+
name: "ReadyForLazyAttemptError",
649+
message:
650+
lazyAttempt.error instanceof Error
651+
? `[${lazyAttempt.cause}] ${lazyAttempt.error.name}: ${lazyAttempt.error.message}`
652+
: `[${lazyAttempt.cause}] ${lazyAttempt.error}`,
653+
});
654+
655+
return;
656+
}
657+
658+
const lazyAttemptResponse = lazyAttempt.result;
619659

620-
if (!lazyAttempt) {
660+
if (!lazyAttemptResponse) {
621661
log.error("no lazy attempt ack");
622662

623663
await crashRun({
@@ -628,8 +668,10 @@ class TaskCoordinator {
628668
return;
629669
}
630670

631-
if (!lazyAttempt.success) {
632-
log.error("failed to get lazy attempt payload", { reason: lazyAttempt.reason });
671+
if (!lazyAttemptResponse.success) {
672+
log.error("failed to get lazy attempt payload", {
673+
reason: lazyAttemptResponse.reason,
674+
});
633675

634676
await crashRun({
635677
name: "ReadyForLazyAttemptError",
@@ -643,7 +685,7 @@ class TaskCoordinator {
643685

644686
socket.emit("EXECUTE_TASK_RUN_LAZY_ATTEMPT", {
645687
version: "v1",
646-
lazyPayload: lazyAttempt.lazyPayload,
688+
lazyPayload: lazyAttemptResponse.lazyPayload,
647689
});
648690
} catch (error) {
649691
if (error instanceof ChaosMonkey.Error) {
@@ -949,20 +991,39 @@ class TaskCoordinator {
949991

950992
log.addFields({ checkpoint });
951993

952-
const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
953-
version: "v1",
954-
runId: socket.data.runId,
955-
attemptFriendlyId: message.attemptFriendlyId,
956-
docker: checkpoint.docker,
957-
location: checkpoint.location,
958-
reason: {
959-
type: "WAIT_FOR_DURATION",
960-
ms: message.ms,
961-
now: message.now,
962-
},
994+
const checkpointCreated = await backoff.execute(async () => {
995+
return await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
996+
version: "v1",
997+
runId: socket.data.runId,
998+
attemptFriendlyId: message.attemptFriendlyId,
999+
docker: checkpoint.docker,
1000+
location: checkpoint.location,
1001+
reason: {
1002+
type: "WAIT_FOR_DURATION",
1003+
ms: message.ms,
1004+
now: message.now,
1005+
},
1006+
});
9631007
});
9641008

965-
if (ack?.keepRunAlive) {
1009+
if (!checkpointCreated.success) {
1010+
log.error("Failed to send CHECKPOINT_CREATED", {
1011+
error: serializeError(checkpointCreated.error),
1012+
cause: checkpointCreated.cause,
1013+
});
1014+
1015+
await crashRun({
1016+
name: "WaitForDurationCheckpointError",
1017+
message:
1018+
checkpointCreated.error instanceof Error
1019+
? `[${checkpointCreated.cause}] ${checkpointCreated.error.name}: ${checkpointCreated.error.message}`
1020+
: `[${checkpointCreated.cause}] ${checkpointCreated.error}`,
1021+
});
1022+
1023+
return;
1024+
}
1025+
1026+
if (checkpointCreated.result?.keepRunAlive) {
9661027
log.log("keeping run alive after duration checkpoint");
9671028
return;
9681029
}
@@ -1042,19 +1103,38 @@ class TaskCoordinator {
10421103
socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`;
10431104
log.log("WAIT_FOR_TASK set requiresCheckpointResumeWithMessage");
10441105

1045-
const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
1046-
version: "v1",
1047-
runId: socket.data.runId,
1048-
attemptFriendlyId: message.attemptFriendlyId,
1049-
docker: checkpoint.docker,
1050-
location: checkpoint.location,
1051-
reason: {
1052-
type: "WAIT_FOR_TASK",
1053-
friendlyId: message.friendlyId,
1054-
},
1106+
const checkpointCreated = await backoff.execute(async () => {
1107+
return await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
1108+
version: "v1",
1109+
runId: socket.data.runId,
1110+
attemptFriendlyId: message.attemptFriendlyId,
1111+
docker: checkpoint.docker,
1112+
location: checkpoint.location,
1113+
reason: {
1114+
type: "WAIT_FOR_TASK",
1115+
friendlyId: message.friendlyId,
1116+
},
1117+
});
10551118
});
10561119

1057-
if (ack?.keepRunAlive) {
1120+
if (!checkpointCreated.success) {
1121+
log.error("Failed to send CHECKPOINT_CREATED", {
1122+
error: serializeError(checkpointCreated.error),
1123+
cause: checkpointCreated.cause,
1124+
});
1125+
1126+
await crashRun({
1127+
name: "WaitForTaskCheckpointError",
1128+
message:
1129+
checkpointCreated.error instanceof Error
1130+
? `[${checkpointCreated.cause}] ${checkpointCreated.error.name}: ${checkpointCreated.error.message}`
1131+
: `[${checkpointCreated.cause}] ${checkpointCreated.error}`,
1132+
});
1133+
1134+
return;
1135+
}
1136+
1137+
if (checkpointCreated.result?.keepRunAlive) {
10581138
socket.data.requiresCheckpointResumeWithMessage = undefined;
10591139
log.log("keeping run alive after task checkpoint");
10601140
return;
@@ -1135,20 +1215,39 @@ class TaskCoordinator {
11351215
socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`;
11361216
log.log("WAIT_FOR_BATCH set checkpoint");
11371217

1138-
const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
1139-
version: "v1",
1140-
runId: socket.data.runId,
1141-
attemptFriendlyId: message.attemptFriendlyId,
1142-
docker: checkpoint.docker,
1143-
location: checkpoint.location,
1144-
reason: {
1145-
type: "WAIT_FOR_BATCH",
1146-
batchFriendlyId: message.batchFriendlyId,
1147-
runFriendlyIds: message.runFriendlyIds,
1148-
},
1218+
const checkpointCreated = await backoff.execute(async () => {
1219+
return await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
1220+
version: "v1",
1221+
runId: socket.data.runId,
1222+
attemptFriendlyId: message.attemptFriendlyId,
1223+
docker: checkpoint.docker,
1224+
location: checkpoint.location,
1225+
reason: {
1226+
type: "WAIT_FOR_BATCH",
1227+
batchFriendlyId: message.batchFriendlyId,
1228+
runFriendlyIds: message.runFriendlyIds,
1229+
},
1230+
});
11491231
});
11501232

1151-
if (ack?.keepRunAlive) {
1233+
if (!checkpointCreated.success) {
1234+
log.error("Failed to send CHECKPOINT_CREATED", {
1235+
error: serializeError(checkpointCreated.error),
1236+
cause: checkpointCreated.cause,
1237+
});
1238+
1239+
await crashRun({
1240+
name: "WaitForBatchCheckpointError",
1241+
message:
1242+
checkpointCreated.error instanceof Error
1243+
? `[${checkpointCreated.cause}] ${checkpointCreated.error.name}: ${checkpointCreated.error.message}`
1244+
: `[${checkpointCreated.cause}] ${checkpointCreated.error}`,
1245+
});
1246+
1247+
return;
1248+
}
1249+
1250+
if (checkpointCreated.result?.keepRunAlive) {
11521251
socket.data.requiresCheckpointResumeWithMessage = undefined;
11531252
log.log("keeping run alive after batch checkpoint");
11541253
return;
@@ -1239,26 +1338,49 @@ class TaskCoordinator {
12391338
try {
12401339
await chaosMonkey.call({ throwErrors: false });
12411340

1242-
const createAttempt = await this.#platformSocket?.sendWithAck(
1243-
"CREATE_TASK_RUN_ATTEMPT",
1244-
{
1245-
runId: message.runId,
1246-
envId: socket.data.envId,
1247-
}
1248-
);
1341+
const createAttempt = await backoff
1342+
.max(3) // The run controller expects a reply in 15s so we can't have long retry delays
1343+
.maxRetries(5)
1344+
.execute(async () => {
1345+
return await this.#platformSocket?.sendWithAck("CREATE_TASK_RUN_ATTEMPT", {
1346+
runId: message.runId,
1347+
envId: socket.data.envId,
1348+
});
1349+
});
1350+
1351+
if (!createAttempt.success) {
1352+
log.error("Failed to send CREATE_TASK_RUN_ATTEMPT", {
1353+
error: serializeError(createAttempt.error),
1354+
cause: createAttempt.cause,
1355+
});
1356+
1357+
callback({
1358+
success: false,
1359+
reason:
1360+
createAttempt.error instanceof Error
1361+
? `[${createAttempt.cause}] ${createAttempt.error.name}: ${createAttempt.error.message}`
1362+
: `[${createAttempt.cause}] ${createAttempt.error}`,
1363+
});
12491364

1250-
if (!createAttempt?.success) {
1251-
log.debug("no ack while creating attempt", { reason: createAttempt?.reason });
1252-
callback({ success: false, reason: createAttempt?.reason });
12531365
return;
12541366
}
12551367

1256-
updateAttemptFriendlyId(createAttempt.executionPayload.execution.attempt.id);
1257-
updateAttemptNumber(createAttempt.executionPayload.execution.attempt.number);
1368+
const createAttemptResponse = createAttempt.result;
1369+
1370+
if (!createAttemptResponse?.success) {
1371+
log.debug("no ack while creating attempt", { reason: createAttemptResponse?.reason });
1372+
callback({ success: false, reason: createAttemptResponse?.reason });
1373+
return;
1374+
}
1375+
1376+
const { executionPayload } = createAttemptResponse;
1377+
1378+
updateAttemptFriendlyId(executionPayload.execution.attempt.id);
1379+
updateAttemptNumber(executionPayload.execution.attempt.number);
12581380

12591381
callback({
12601382
success: true,
1261-
executionPayload: createAttempt.executionPayload,
1383+
executionPayload,
12621384
});
12631385
} catch (error) {
12641386
log.error("CREATE_TASK_RUN_ATTEMPT error", { error });

0 commit comments

Comments
 (0)