diff --git a/.changeset/witty-jars-approve.md b/.changeset/witty-jars-approve.md new file mode 100644 index 0000000000..9b4070358b --- /dev/null +++ b/.changeset/witty-jars-approve.md @@ -0,0 +1,6 @@ +--- +"@trigger.dev/core": patch +--- + +- Add new run completion submission message with ack +- Add timeout support to sendWithAck diff --git a/apps/coordinator/src/index.ts b/apps/coordinator/src/index.ts index b493f3913f..aa5a6062b7 100644 --- a/apps/coordinator/src/index.ts +++ b/apps/coordinator/src/index.ts @@ -11,7 +11,7 @@ import { } from "@trigger.dev/core/v3"; import { ZodNamespace } from "@trigger.dev/core/v3/zodNamespace"; import { ZodSocketConnection } from "@trigger.dev/core/v3/zodSocket"; -import { HttpReply, getTextBody } from "@trigger.dev/core/v3/apps"; +import { ExponentialBackoff, HttpReply, getTextBody } from "@trigger.dev/core/v3/apps"; import { ChaosMonkey } from "./chaosMonkey"; import { Checkpointer } from "./checkpointer"; import { boolFromEnv, numFromEnv, safeJsonParse } from "./util"; @@ -30,6 +30,11 @@ const PLATFORM_WS_PORT = process.env.PLATFORM_WS_PORT || 3030; const PLATFORM_SECRET = process.env.PLATFORM_SECRET || "coordinator-secret"; const SECURE_CONNECTION = ["1", "true"].includes(process.env.SECURE_CONNECTION ?? "false"); +const TASK_RUN_COMPLETED_WITH_ACK_TIMEOUT_MS = + parseInt(process.env.TASK_RUN_COMPLETED_WITH_ACK_TIMEOUT_MS || "") || 30_000; +const TASK_RUN_COMPLETED_WITH_ACK_MAX_RETRIES = + parseInt(process.env.TASK_RUN_COMPLETED_WITH_ACK_MAX_RETRIES || "") || 7; + const logger = new SimpleStructuredLogger("coordinator", undefined, { nodeName: NODE_NAME }); const chaosMonkey = new ChaosMonkey( !!process.env.CHAOS_MONKEY_ENABLED, @@ -720,19 +725,84 @@ class TaskCoordinator { await chaosMonkey.call({ throwErrors: false }); - const completeWithoutCheckpoint = (shouldExit: boolean) => { + const sendCompletionWithAck = async (): Promise => { + try { + const response = await this.#platformSocket?.sendWithAck( + "TASK_RUN_COMPLETED_WITH_ACK", + { + version: "v2", + execution, + completion, + }, + TASK_RUN_COMPLETED_WITH_ACK_TIMEOUT_MS + ); + + if (!response) { + log.error("TASK_RUN_COMPLETED_WITH_ACK: no response"); + return false; + } + + if (!response.success) { + log.error("TASK_RUN_COMPLETED_WITH_ACK: error response", { + error: response.error, + }); + return false; + } + + log.log("TASK_RUN_COMPLETED_WITH_ACK: successful response"); + return true; + } catch (error) { + log.error("TASK_RUN_COMPLETED_WITH_ACK: threw error", { error }); + return false; + } + }; + + const completeWithoutCheckpoint = async (shouldExit: boolean) => { const supportsRetryCheckpoints = message.version === "v1"; - this.#platformSocket?.send("TASK_RUN_COMPLETED", { - version: supportsRetryCheckpoints ? "v1" : "v2", - execution, - completion, - }); callback({ willCheckpointAndRestore: false, shouldExit }); + + if (supportsRetryCheckpoints) { + // This is only here for backwards compat + this.#platformSocket?.send("TASK_RUN_COMPLETED", { + version: "v1", + execution, + completion, + }); + } else { + // 99.99% of runs should end up here + + const completedWithAckBackoff = new ExponentialBackoff("FullJitter").maxRetries( + TASK_RUN_COMPLETED_WITH_ACK_MAX_RETRIES + ); + + const result = await completedWithAckBackoff.execute( + async ({ retry, delay, elapsedMs }) => { + logger.log("TASK_RUN_COMPLETED_WITH_ACK: sending with backoff", { + retry, + delay, + elapsedMs, + }); + + const success = await sendCompletionWithAck(); + + if (!success) { + throw new Error("Failed to send completion with ack"); + } + } + ); + + if (!result.success) { + logger.error("TASK_RUN_COMPLETED_WITH_ACK: failed to send with backoff", result); + return; + } + + logger.log("TASK_RUN_COMPLETED_WITH_ACK: sent with backoff", result); + } }; if (completion.ok) { - completeWithoutCheckpoint(true); + await completeWithoutCheckpoint(true); return; } @@ -740,17 +810,17 @@ class TaskCoordinator { completion.error.type === "INTERNAL_ERROR" && completion.error.code === "TASK_RUN_CANCELLED" ) { - completeWithoutCheckpoint(true); + await completeWithoutCheckpoint(true); return; } if (completion.retry === undefined) { - completeWithoutCheckpoint(true); + await completeWithoutCheckpoint(true); return; } if (completion.retry.delay < this.#delayThresholdInMs) { - completeWithoutCheckpoint(false); + await completeWithoutCheckpoint(false); // Prevents runs that fail fast from never sending a heartbeat this.#sendRunHeartbeat(socket.data.runId); @@ -759,7 +829,7 @@ class TaskCoordinator { } if (message.version === "v2") { - completeWithoutCheckpoint(true); + await completeWithoutCheckpoint(true); return; } @@ -768,7 +838,7 @@ class TaskCoordinator { const willCheckpointAndRestore = canCheckpoint || willSimulate; if (!willCheckpointAndRestore) { - completeWithoutCheckpoint(false); + await completeWithoutCheckpoint(false); return; } @@ -792,7 +862,7 @@ class TaskCoordinator { if (!checkpoint) { log.error("Failed to checkpoint"); - completeWithoutCheckpoint(false); + await completeWithoutCheckpoint(false); return; } diff --git a/apps/webapp/app/v3/handleSocketIo.server.ts b/apps/webapp/app/v3/handleSocketIo.server.ts index e1c97d2e0a..d06e6240fe 100644 --- a/apps/webapp/app/v3/handleSocketIo.server.ts +++ b/apps/webapp/app/v3/handleSocketIo.server.ts @@ -136,6 +136,44 @@ function createCoordinatorNamespace(io: Server) { checkpoint: message.checkpoint, }); }, + TASK_RUN_COMPLETED_WITH_ACK: async (message) => { + try { + const completeAttempt = new CompleteAttemptService({ + supportsRetryCheckpoints: message.version === "v1", + }); + await completeAttempt.call({ + completion: message.completion, + execution: message.execution, + checkpoint: message.checkpoint, + }); + + return { + success: true, + }; + } catch (error) { + const friendlyError = + error instanceof Error + ? { + name: error.name, + message: error.message, + stack: error.stack, + } + : { + name: "UnknownError", + message: String(error), + }; + + logger.error("Error while completing attempt with ack", { + error: friendlyError, + message, + }); + + return { + success: false, + error: friendlyError, + }; + } + }, TASK_RUN_FAILED_TO_RUN: async (message) => { await sharedQueueTasks.taskRunFailed(message.completion); }, diff --git a/packages/core/src/v3/schemas/messages.ts b/packages/core/src/v3/schemas/messages.ts index 29b1b1a825..431504c56b 100644 --- a/packages/core/src/v3/schemas/messages.ts +++ b/packages/core/src/v3/schemas/messages.ts @@ -435,6 +435,20 @@ export const CoordinatorToPlatformMessages = { .optional(), }), }, + TASK_RUN_COMPLETED_WITH_ACK: { + message: z.object({ + version: z.enum(["v1", "v2"]).default("v2"), + execution: ProdTaskRunExecution, + completion: TaskRunExecutionResult, + checkpoint: z + .object({ + docker: z.boolean(), + location: z.string(), + }) + .optional(), + }), + callback: AckCallbackResult, + }, TASK_RUN_FAILED_TO_RUN: { message: z.object({ version: z.literal("v1").default("v1"), diff --git a/packages/core/src/v3/zodSocket.ts b/packages/core/src/v3/zodSocket.ts index 66d45ef120..160620c42c 100644 --- a/packages/core/src/v3/zodSocket.ts +++ b/packages/core/src/v3/zodSocket.ts @@ -293,7 +293,8 @@ export class ZodSocketMessageSender>( type: K, - payload: z.input> + payload: z.input>, + timeout?: number ): Promise>> { const schema = this.#schema[type]?.["message"]; @@ -307,8 +308,10 @@ export class ZodSocketMessageSender