From 4b4c139282ab64d402b613a6d5c6d0bbe346b704 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 14 Feb 2025 14:48:31 +0000 Subject: [PATCH 1/6] add timeout support to sendWithAck --- packages/core/src/v3/zodSocket.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/core/src/v3/zodSocket.ts b/packages/core/src/v3/zodSocket.ts index 66d45ef120..160620c42c 100644 --- a/packages/core/src/v3/zodSocket.ts +++ b/packages/core/src/v3/zodSocket.ts @@ -293,7 +293,8 @@ export class ZodSocketMessageSender>( type: K, - payload: z.input> + payload: z.input>, + timeout?: number ): Promise>> { const schema = this.#schema[type]?.["message"]; @@ -307,8 +308,10 @@ export class ZodSocketMessageSender Date: Fri, 14 Feb 2025 14:50:35 +0000 Subject: [PATCH 2/6] coordinator will retry completion submission --- apps/coordinator/src/index.ts | 93 +++++++++++++++++---- apps/webapp/app/v3/handleSocketIo.server.ts | 38 +++++++++ packages/core/src/v3/schemas/messages.ts | 14 ++++ 3 files changed, 131 insertions(+), 14 deletions(-) diff --git a/apps/coordinator/src/index.ts b/apps/coordinator/src/index.ts index b493f3913f..c3e5e7c3c6 100644 --- a/apps/coordinator/src/index.ts +++ b/apps/coordinator/src/index.ts @@ -11,7 +11,7 @@ import { } from "@trigger.dev/core/v3"; import { ZodNamespace } from "@trigger.dev/core/v3/zodNamespace"; import { ZodSocketConnection } from "@trigger.dev/core/v3/zodSocket"; -import { HttpReply, getTextBody } from "@trigger.dev/core/v3/apps"; +import { ExponentialBackoff, HttpReply, getTextBody } from "@trigger.dev/core/v3/apps"; import { ChaosMonkey } from "./chaosMonkey"; import { Checkpointer } from "./checkpointer"; import { boolFromEnv, numFromEnv, safeJsonParse } from "./util"; @@ -30,6 +30,11 @@ const PLATFORM_WS_PORT = process.env.PLATFORM_WS_PORT || 3030; const PLATFORM_SECRET = process.env.PLATFORM_SECRET || "coordinator-secret"; const SECURE_CONNECTION = ["1", "true"].includes(process.env.SECURE_CONNECTION ?? "false"); +const TASK_RUN_COMPLETED_WITH_ACK_TIMEOUT_MS = + parseInt(process.env.TASK_RUN_COMPLETED_WITH_ACK_TIMEOUT_MS || "") || 30_000; +const TASK_RUN_COMPLETED_WITH_ACK_MAX_RETRIES = + parseInt(process.env.TASK_RUN_COMPLETED_WITH_ACK_MAX_RETRIES || "") || 5; + const logger = new SimpleStructuredLogger("coordinator", undefined, { nodeName: NODE_NAME }); const chaosMonkey = new ChaosMonkey( !!process.env.CHAOS_MONKEY_ENABLED, @@ -720,19 +725,79 @@ class TaskCoordinator { await chaosMonkey.call({ throwErrors: false }); - const completeWithoutCheckpoint = (shouldExit: boolean) => { + const sendCompletionWithAck = async (): Promise => { + try { + const response = await this.#platformSocket?.sendWithAck( + "TASK_RUN_COMPLETED_WITH_ACK", + { + version: "v2", + execution, + completion, + }, + TASK_RUN_COMPLETED_WITH_ACK_TIMEOUT_MS + ); + + if (!response) { + log.error("TASK_RUN_COMPLETED_WITH_ACK: no response"); + return false; + } + + if (!response.success) { + log.error("TASK_RUN_COMPLETED_WITH_ACK: error response", { + error: response.error, + }); + return false; + } + + log.log("TASK_RUN_COMPLETED_WITH_ACK: successful response"); + return true; + } catch (error) { + log.error("TASK_RUN_COMPLETED_WITH_ACK: threw error", { error }); + return false; + } + }; + + const completeWithoutCheckpoint = async (shouldExit: boolean) => { const supportsRetryCheckpoints = message.version === "v1"; - this.#platformSocket?.send("TASK_RUN_COMPLETED", { - version: supportsRetryCheckpoints ? "v1" : "v2", - execution, - completion, - }); callback({ willCheckpointAndRestore: false, shouldExit }); + + if (supportsRetryCheckpoints) { + // This is only here for backwards compat + this.#platformSocket?.send("TASK_RUN_COMPLETED", { + version: "v1", + execution, + completion, + }); + } else { + // 99.99% of runs should end up here + + const completedWithAckBackoff = new ExponentialBackoff("FullJitter").maxRetries( + TASK_RUN_COMPLETED_WITH_ACK_MAX_RETRIES + ); + + const result = await completedWithAckBackoff.execute( + async ({ retry, delay, elapsedMs }) => { + logger.log("TASK_RUN_COMPLETED_WITH_ACK: sending with backoff", { + retry, + delay, + elapsedMs, + }); + return await sendCompletionWithAck(); + } + ); + + if (!result.success) { + logger.error("TASK_RUN_COMPLETED_WITH_ACK: failed to send with backoff", result); + return; + } + + logger.log("TASK_RUN_COMPLETED_WITH_ACK: sent with backoff", result); + } }; if (completion.ok) { - completeWithoutCheckpoint(true); + await completeWithoutCheckpoint(true); return; } @@ -740,17 +805,17 @@ class TaskCoordinator { completion.error.type === "INTERNAL_ERROR" && completion.error.code === "TASK_RUN_CANCELLED" ) { - completeWithoutCheckpoint(true); + await completeWithoutCheckpoint(true); return; } if (completion.retry === undefined) { - completeWithoutCheckpoint(true); + await completeWithoutCheckpoint(true); return; } if (completion.retry.delay < this.#delayThresholdInMs) { - completeWithoutCheckpoint(false); + await completeWithoutCheckpoint(false); // Prevents runs that fail fast from never sending a heartbeat this.#sendRunHeartbeat(socket.data.runId); @@ -759,7 +824,7 @@ class TaskCoordinator { } if (message.version === "v2") { - completeWithoutCheckpoint(true); + await completeWithoutCheckpoint(true); return; } @@ -768,7 +833,7 @@ class TaskCoordinator { const willCheckpointAndRestore = canCheckpoint || willSimulate; if (!willCheckpointAndRestore) { - completeWithoutCheckpoint(false); + await completeWithoutCheckpoint(false); return; } @@ -792,7 +857,7 @@ class TaskCoordinator { if (!checkpoint) { log.error("Failed to checkpoint"); - completeWithoutCheckpoint(false); + await completeWithoutCheckpoint(false); return; } diff --git a/apps/webapp/app/v3/handleSocketIo.server.ts b/apps/webapp/app/v3/handleSocketIo.server.ts index e1c97d2e0a..d06e6240fe 100644 --- a/apps/webapp/app/v3/handleSocketIo.server.ts +++ b/apps/webapp/app/v3/handleSocketIo.server.ts @@ -136,6 +136,44 @@ function createCoordinatorNamespace(io: Server) { checkpoint: message.checkpoint, }); }, + TASK_RUN_COMPLETED_WITH_ACK: async (message) => { + try { + const completeAttempt = new CompleteAttemptService({ + supportsRetryCheckpoints: message.version === "v1", + }); + await completeAttempt.call({ + completion: message.completion, + execution: message.execution, + checkpoint: message.checkpoint, + }); + + return { + success: true, + }; + } catch (error) { + const friendlyError = + error instanceof Error + ? { + name: error.name, + message: error.message, + stack: error.stack, + } + : { + name: "UnknownError", + message: String(error), + }; + + logger.error("Error while completing attempt with ack", { + error: friendlyError, + message, + }); + + return { + success: false, + error: friendlyError, + }; + } + }, TASK_RUN_FAILED_TO_RUN: async (message) => { await sharedQueueTasks.taskRunFailed(message.completion); }, diff --git a/packages/core/src/v3/schemas/messages.ts b/packages/core/src/v3/schemas/messages.ts index 29b1b1a825..431504c56b 100644 --- a/packages/core/src/v3/schemas/messages.ts +++ b/packages/core/src/v3/schemas/messages.ts @@ -435,6 +435,20 @@ export const CoordinatorToPlatformMessages = { .optional(), }), }, + TASK_RUN_COMPLETED_WITH_ACK: { + message: z.object({ + version: z.enum(["v1", "v2"]).default("v2"), + execution: ProdTaskRunExecution, + completion: TaskRunExecutionResult, + checkpoint: z + .object({ + docker: z.boolean(), + location: z.string(), + }) + .optional(), + }), + callback: AckCallbackResult, + }, TASK_RUN_FAILED_TO_RUN: { message: z.object({ version: z.literal("v1").default("v1"), From 71862dd618089bbbcc5321f77a9b9cca6a2fec1b Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 14 Feb 2025 15:00:38 +0000 Subject: [PATCH 3/6] actually retry --- apps/coordinator/src/index.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/coordinator/src/index.ts b/apps/coordinator/src/index.ts index c3e5e7c3c6..6a03f4ab7f 100644 --- a/apps/coordinator/src/index.ts +++ b/apps/coordinator/src/index.ts @@ -783,7 +783,12 @@ class TaskCoordinator { delay, elapsedMs, }); - return await sendCompletionWithAck(); + + const success = await sendCompletionWithAck(); + + if (!success) { + throw new Error("Failed to send completion with ack"); + } } ); From 24fc9ae389d604abeb48fe2df6f3f373e95ef29e Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 14 Feb 2025 15:09:21 +0000 Subject: [PATCH 4/6] increase default retries --- apps/coordinator/src/index.ts | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/apps/coordinator/src/index.ts b/apps/coordinator/src/index.ts index 6a03f4ab7f..de1a88ffe9 100644 --- a/apps/coordinator/src/index.ts +++ b/apps/coordinator/src/index.ts @@ -33,7 +33,7 @@ const SECURE_CONNECTION = ["1", "true"].includes(process.env.SECURE_CONNECTION ? const TASK_RUN_COMPLETED_WITH_ACK_TIMEOUT_MS = parseInt(process.env.TASK_RUN_COMPLETED_WITH_ACK_TIMEOUT_MS || "") || 30_000; const TASK_RUN_COMPLETED_WITH_ACK_MAX_RETRIES = - parseInt(process.env.TASK_RUN_COMPLETED_WITH_ACK_MAX_RETRIES || "") || 5; + parseInt(process.env.TASK_RUN_COMPLETED_WITH_ACK_MAX_RETRIES || "") || 7; const logger = new SimpleStructuredLogger("coordinator", undefined, { nodeName: NODE_NAME }); const chaosMonkey = new ChaosMonkey( @@ -783,12 +783,7 @@ class TaskCoordinator { delay, elapsedMs, }); - - const success = await sendCompletionWithAck(); - - if (!success) { - throw new Error("Failed to send completion with ack"); - } + return await sendCompletionWithAck(); } ); From be4f23bee6a9c86f80a9a37abd296dc77c968c75 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 14 Feb 2025 15:11:56 +0000 Subject: [PATCH 5/6] something went wrong there, add this back in --- apps/coordinator/src/index.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/coordinator/src/index.ts b/apps/coordinator/src/index.ts index de1a88ffe9..aa5a6062b7 100644 --- a/apps/coordinator/src/index.ts +++ b/apps/coordinator/src/index.ts @@ -783,7 +783,12 @@ class TaskCoordinator { delay, elapsedMs, }); - return await sendCompletionWithAck(); + + const success = await sendCompletionWithAck(); + + if (!success) { + throw new Error("Failed to send completion with ack"); + } } ); From 7299840366b9e910e4d628cd23e0cdf44db6be24 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 14 Feb 2025 15:14:09 +0000 Subject: [PATCH 6/6] add changeset --- .changeset/witty-jars-approve.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changeset/witty-jars-approve.md diff --git a/.changeset/witty-jars-approve.md b/.changeset/witty-jars-approve.md new file mode 100644 index 0000000000..9b4070358b --- /dev/null +++ b/.changeset/witty-jars-approve.md @@ -0,0 +1,6 @@ +--- +"@trigger.dev/core": patch +--- + +- Add new run completion submission message with ack +- Add timeout support to sendWithAck