From 623f1582e1b55ad3991e12c8e6b58ef47291b214 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 17 Oct 2024 09:31:51 +0100 Subject: [PATCH 01/19] try to correct resume messages with missing checkpoint --- .../v3/marqs/sharedQueueConsumer.server.ts | 90 +++++++++++++++++-- .../v3/services/restoreCheckpoint.server.ts | 20 +++++ 2 files changed, 101 insertions(+), 9 deletions(-) diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index a663011e4a..857a709d2a 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -43,7 +43,7 @@ import { generateJWTTokenForEnvironment } from "~/services/apiAuth.server"; import { EnvironmentVariable } from "../environmentVariables/repository"; import { machinePresetFromConfig } from "../machinePresets.server"; import { env } from "~/env.server"; -import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus"; +import { FINAL_RUN_STATUSES, isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus"; import { getMaxDuration } from "../utils/maxDuration"; const WithTraceContext = z.object({ @@ -620,6 +620,9 @@ export class SharedQueueConsumer { const resumableRun = await prisma.taskRun.findUnique({ where: { id: message.messageId, + status: { + notIn: FINAL_RUN_STATUSES, + }, }, }); @@ -633,6 +636,14 @@ export class SharedQueueConsumer { return; } + if (resumableRun.status !== "EXECUTING") { + logger.warn("Run is not executing, will try to resume anyway", { + queueMessage: message.data, + messageId: message.messageId, + runStatus: resumableRun.status, + }); + } + const resumableAttempt = await prisma.taskRunAttempt.findUnique({ where: { id: messageBody.data.resumableAttemptId, @@ -740,7 +751,11 @@ export class SharedQueueConsumer { executions, }; - logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY_WITH_ACK", { resumeMessage, message }); + logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY_WITH_ACK", { + resumeMessage, + message, + resumableRun, + }); // The attempt should still be running so we can broadcast to all coordinators to resume immediately const responses = await socketIo.coordinatorNamespace @@ -763,15 +778,72 @@ export class SharedQueueConsumer { } const hasSuccess = responses.some((response) => response.success); - if (!hasSuccess) { - logger.warn("RESUME_AFTER_DEPENDENCY_WITH_ACK failed", { - resumeMessage, - responses, - message, - }); - await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000); + + if (hasSuccess) { + this.#doMoreWork(); return; } + + // No coordinator was able to resume the run + logger.warn("RESUME_AFTER_DEPENDENCY_WITH_ACK failed", { + resumeMessage, + responses, + message, + }); + + // Let's check if the run is frozen + if (resumableRun.status === "WAITING_TO_RESUME") { + logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK run is waiting to be restored", { + queueMessage: message.data, + messageId: message.messageId, + }); + + try { + const restoreService = new RestoreCheckpointService(); + + const checkpointEvent = await restoreService.getLastCheckpointEventIfUnrestored( + resumableRun.id + ); + + if (checkpointEvent) { + // The last checkpoint hasn't been restored yet, so restore it + const checkpoint = await restoreService.call({ + eventId: checkpointEvent.id, + }); + + if (!checkpoint) { + logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK failed to restore checkpoint", { + queueMessage: message.data, + messageId: message.messageId, + }); + + await this.#ackAndDoMoreWork(message.messageId); + return; + } + + this.#doMoreWork(); + return; + } + } catch (e) { + if (e instanceof Error) { + this._currentSpan?.recordException(e); + } else { + this._currentSpan?.recordException(new Error(String(e))); + } + + this._endSpanInNextIteration = true; + + await this.#nackAndDoMoreWork( + message.messageId, + this._options.nextTickInterval, + 5_000 + ); + return; + } + } + + await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000); + return; } catch (e) { if (e instanceof Error) { this._currentSpan?.recordException(e); diff --git a/apps/webapp/app/v3/services/restoreCheckpoint.server.ts b/apps/webapp/app/v3/services/restoreCheckpoint.server.ts index 3ca0efbc33..9a9b82210a 100644 --- a/apps/webapp/app/v3/services/restoreCheckpoint.server.ts +++ b/apps/webapp/app/v3/services/restoreCheckpoint.server.ts @@ -112,4 +112,24 @@ export class RestoreCheckpointService extends BaseService { return checkpoint; } + + async getLastCheckpointEventIfUnrestored(runId: string) { + const event = await this._prisma.checkpointRestoreEvent.findFirst({ + where: { + runId, + }, + take: 1, + orderBy: { + createdAt: "desc", + }, + }); + + if (!event) { + return; + } + + if (event.type === "CHECKPOINT") { + return event; + } + } } From 11066b4e740dd0e67b69db7f507b20c4895328b6 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 17 Oct 2024 10:31:57 +0100 Subject: [PATCH 02/19] prevent creating checkpoints for outdated task waits --- .../v3/services/createCheckpoint.server.ts | 52 +++++++++++++++++-- 1 file changed, 49 insertions(+), 3 deletions(-) diff --git a/apps/webapp/app/v3/services/createCheckpoint.server.ts b/apps/webapp/app/v3/services/createCheckpoint.server.ts index 7290424248..b95843c065 100644 --- a/apps/webapp/app/v3/services/createCheckpoint.server.ts +++ b/apps/webapp/app/v3/services/createCheckpoint.server.ts @@ -35,7 +35,16 @@ export class CreateCheckpointService extends BaseService { friendlyId: params.attemptFriendlyId, }, include: { - taskRun: true, + taskRun: { + include: { + childRuns: { + orderBy: { + createdAt: "asc", + }, + take: 1, + }, + }, + }, backgroundWorker: { select: { id: true, @@ -93,6 +102,45 @@ export class CreateCheckpointService extends BaseService { }; } + const { reason } = params; + + switch (reason.type) { + case "WAIT_FOR_TASK": { + const lastChildRun = attempt.taskRun.childRuns[0]; + + if (!lastChildRun) { + logger.warn("CreateCheckpointService: No child runs, creating checkpoint regardless", { + attemptId: attempt.id, + runId: attempt.taskRunId, + params, + }); + + break; + } + + if (lastChildRun.friendlyId !== reason.friendlyId) { + logger.error("CreateCheckpointService: Checkpoint not for most recent child run", { + attemptId: attempt.id, + runId: attempt.taskRunId, + params, + }); + + return { + success: false, + keepRunAlive: true, + }; + } + + break; + } + case "WAIT_FOR_BATCH": { + break; + } + default: { + break; + } + } + //sleep to test slow checkpoints // await new Promise((resolve) => setTimeout(resolve, 60_000)); @@ -128,8 +176,6 @@ export class CreateCheckpointService extends BaseService { }, }); - const { reason } = params; - let checkpointEvent: CheckpointRestoreEvent | undefined; switch (reason.type) { From f2b5c2ac42d52d3ab103ccc4b8e1f57c15b4e6e6 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 17 Oct 2024 10:38:09 +0100 Subject: [PATCH 03/19] prevent creating checkpoints for outdated batch waits --- .../v3/services/createCheckpoint.server.ts | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/apps/webapp/app/v3/services/createCheckpoint.server.ts b/apps/webapp/app/v3/services/createCheckpoint.server.ts index b95843c065..2d7d627a64 100644 --- a/apps/webapp/app/v3/services/createCheckpoint.server.ts +++ b/apps/webapp/app/v3/services/createCheckpoint.server.ts @@ -122,6 +122,8 @@ export class CreateCheckpointService extends BaseService { logger.error("CreateCheckpointService: Checkpoint not for most recent child run", { attemptId: attempt.id, runId: attempt.taskRunId, + lastChild: lastChildRun.friendlyId, + checkpointFor: reason.friendlyId, params, }); @@ -134,6 +136,33 @@ export class CreateCheckpointService extends BaseService { break; } case "WAIT_FOR_BATCH": { + const lastChildRun = attempt.taskRun.childRuns[0]; + + if (!lastChildRun) { + logger.warn("CreateCheckpointService: No child runs, creating checkpoint regardless", { + attemptId: attempt.id, + runId: attempt.taskRunId, + params, + }); + + break; + } + + if (!reason.runFriendlyIds.includes(lastChildRun.friendlyId)) { + logger.error("CreateCheckpointService: Checkpoint not for most recent batch", { + attemptId: attempt.id, + runId: attempt.taskRunId, + lastChild: lastChildRun.friendlyId, + checkpointFor: reason.runFriendlyIds, + params, + }); + + return { + success: false, + keepRunAlive: true, + }; + } + break; } default: { From d756a16e3d1864c55b29d72ee0d7f3bb98d3549a Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 17 Oct 2024 12:26:30 +0100 Subject: [PATCH 04/19] use heartbeats to check for and clean up any leftover containers --- .../v3/marqs/sharedQueueConsumer.server.ts | 66 ++++++++++++++++++- apps/webapp/app/v3/requeueTaskRun.server.ts | 39 ++++++++++- 2 files changed, 101 insertions(+), 4 deletions(-) diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index 857a709d2a..5d4bcadacd 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -21,7 +21,7 @@ import { TaskRunStatus, } from "@trigger.dev/database"; import { z } from "zod"; -import { prisma } from "~/db.server"; +import { $replica, prisma } from "~/db.server"; import { findEnvironmentById } from "~/models/runtimeEnvironment.server"; import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; @@ -1297,13 +1297,13 @@ class SharedQueueTasks { return; } - await marqs?.heartbeatMessage(taskRunAttempt.taskRunId); + await this.#heartbeat(taskRunAttempt.taskRunId); } async taskRunHeartbeat(runId: string) { logger.debug("[SharedQueueConsumer] taskRunHeartbeat()", { runId }); - await marqs?.heartbeatMessage(runId); + await this.#heartbeat(runId); } public async taskRunFailed(completion: TaskRunFailedExecutionResult) { @@ -1314,6 +1314,66 @@ class SharedQueueTasks { await service.call(completion.id, completion); } + async #heartbeat(runId: string) { + await marqs?.heartbeatMessage(runId); + + try { + // There can be a lot of calls per minute and the data doesn't have to be accurate, so use the read replica + const taskRun = await $replica.taskRun.findFirst({ + where: { + id: runId, + }, + select: { + id: true, + status: true, + runtimeEnvironment: { + select: { + type: true, + }, + }, + lockedToVersion: { + select: { + supportsLazyAttempts: true, + }, + }, + }, + }); + + if (!taskRun) { + logger.error("SharedQueueTasks.#heartbeat: Task run not found", { + runId, + }); + + return; + } + + if (taskRun.runtimeEnvironment.type === "DEVELOPMENT") { + return; + } + + if (isFinalRunStatus(taskRun.status)) { + logger.debug("SharedQueueTasks.#heartbeat: Task run is in final status", { + runId, + status: taskRun.status, + }); + + // Signal to exit any leftover containers + socketIo.coordinatorNamespace.emit("REQUEST_RUN_CANCELLATION", { + version: "v1", + runId: taskRun.id, + // Give the run a few seconds to exit to complete any flushing etc + delayInMs: taskRun.lockedToVersion?.supportsLazyAttempts ? 5_000 : undefined, + }); + return; + } + } catch (error) { + logger.error("SharedQueueTasks.#heartbeat: Error signaling run cancellation", { + runId, + error: error instanceof Error ? error.message : error, + }); + } + } + async #buildEnvironmentVariables( environment: RuntimeEnvironment, runId: string, diff --git a/apps/webapp/app/v3/requeueTaskRun.server.ts b/apps/webapp/app/v3/requeueTaskRun.server.ts index 8228829e87..005eadbf6a 100644 --- a/apps/webapp/app/v3/requeueTaskRun.server.ts +++ b/apps/webapp/app/v3/requeueTaskRun.server.ts @@ -6,11 +6,29 @@ import { FailedTaskRunService } from "./failedTaskRun.server"; import { BaseService } from "./services/baseService.server"; import { PrismaClientOrTransaction } from "~/db.server"; import { workerQueue } from "~/services/worker.server"; +import { socketIo } from "./handleSocketIo.server"; export class RequeueTaskRunService extends BaseService { public async call(runId: string) { const taskRun = await this._prisma.taskRun.findUnique({ - where: { id: runId }, + where: { + id: runId, + }, + select: { + id: true, + friendlyId: true, + status: true, + runtimeEnvironment: { + select: { + type: true, + }, + }, + lockedToVersion: { + select: { + supportsLazyAttempts: true, + }, + }, + }, }); if (!taskRun) { @@ -76,6 +94,25 @@ export class RequeueTaskRunService extends BaseService { await marqs?.acknowledgeMessage(taskRun.id); + try { + if (taskRun.runtimeEnvironment.type === "DEVELOPMENT") { + return; + } + + // Signal to exit any leftover containers + socketIo.coordinatorNamespace.emit("REQUEST_RUN_CANCELLATION", { + version: "v1", + runId: taskRun.id, + // Give the run a few seconds to exit to complete any flushing etc + delayInMs: taskRun.lockedToVersion?.supportsLazyAttempts ? 5_000 : undefined, + }); + } catch (error) { + logger.error("[RequeueTaskRunService] Error signaling run cancellation", { + runId: taskRun.id, + error: error instanceof Error ? error.message : error, + }); + } + break; } default: { From df15d6af91b3a11d80f26d1937f8d424cb49cf20 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 17 Oct 2024 12:27:18 +0100 Subject: [PATCH 05/19] lint --- .../environmentVariablesRepository.server.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts b/apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts index b563c54767..502c49acbd 100644 --- a/apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts +++ b/apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts @@ -136,7 +136,7 @@ export class EnvironmentVariablesRepository implements Repository { try { for (const variable of values) { - const result = await $transaction(this.prismaClient, async (tx) => { + const result = await $transaction(this.prismaClient, async (tx) => { const environmentVariable = await tx.environmentVariable.upsert({ where: { projectId_key: { From e003d2551c4399e9f96072dd4203ca6766be64bd Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 17 Oct 2024 13:12:50 +0100 Subject: [PATCH 06/19] improve exec logging --- apps/coordinator/src/exec.ts | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/apps/coordinator/src/exec.ts b/apps/coordinator/src/exec.ts index d0c7745b0b..b905723c0f 100644 --- a/apps/coordinator/src/exec.ts +++ b/apps/coordinator/src/exec.ts @@ -64,7 +64,18 @@ export class Exec { command, argsRaw: args, argsTrimmed, - ...output, + globalOpts: { + trimArgs: this.trimArgs, + neverThrow: this.neverThrow, + hasAbortSignal: !!this.abortSignal, + }, + localOpts: opts, + stdout: output.stdout, + stderr: output.stderr, + pid: result.pid, + exitCode: result.exitCode, + aborted: result.aborted, + killed: result.killed, }; if (this.logOutput) { From 9af60189ef4527c9c5e411de852c3f430bbf391b Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 17 Oct 2024 16:35:38 +0100 Subject: [PATCH 07/19] improve resume attempt logs --- .../app/v3/services/resumeAttempt.server.ts | 57 +++++++++---------- 1 file changed, 27 insertions(+), 30 deletions(-) diff --git a/apps/webapp/app/v3/services/resumeAttempt.server.ts b/apps/webapp/app/v3/services/resumeAttempt.server.ts index f02d9e4ccc..a0c5b7fa18 100644 --- a/apps/webapp/app/v3/services/resumeAttempt.server.ts +++ b/apps/webapp/app/v3/services/resumeAttempt.server.ts @@ -2,23 +2,24 @@ import { CoordinatorToPlatformMessages, TaskRunExecution, TaskRunExecutionResult, - WaitReason, } from "@trigger.dev/core/v3"; import type { InferSocketMessageSchema } from "@trigger.dev/core/v3/zodSocket"; import { $transaction, PrismaClientOrTransaction } from "~/db.server"; import { logger } from "~/services/logger.server"; import { marqs } from "~/v3/marqs/index.server"; import { socketIo } from "../handleSocketIo.server"; -import { SharedQueueMessageBody, sharedQueueTasks } from "../marqs/sharedQueueConsumer.server"; +import { sharedQueueTasks } from "../marqs/sharedQueueConsumer.server"; import { BaseService } from "./baseService.server"; import { TaskRunAttempt } from "@trigger.dev/database"; import { isFinalRunStatus } from "../taskStatus"; export class ResumeAttemptService extends BaseService { + private _logger = logger; + public async call( params: InferSocketMessageSchema ): Promise { - logger.debug(`ResumeAttemptService.call()`, params); + this._logger.debug(`ResumeAttemptService.call()`, params); await $transaction(this._prisma, async (tx) => { const attempt = await tx.taskRunAttempt.findUnique({ @@ -77,16 +78,18 @@ export class ResumeAttemptService extends BaseService { }); if (!attempt) { - logger.error("Could not find attempt", { attemptFriendlyId: params.attemptFriendlyId }); + this._logger.error("Could not find attempt", params); return; } + this._logger = logger.child({ + attemptId: attempt.id, + attemptFriendlyId: attempt.friendlyId, + taskRun: attempt.taskRun, + }); + if (isFinalRunStatus(attempt.taskRun.status)) { - logger.error("Run is not resumable", { - attemptId: attempt.id, - runId: attempt.taskRunId, - status: attempt.taskRun.status, - }); + this._logger.error("Run is not resumable"); return; } @@ -94,10 +97,7 @@ export class ResumeAttemptService extends BaseService { switch (params.type) { case "WAIT_FOR_DURATION": { - logger.debug("Sending duration wait resume message", { - attemptId: attempt.id, - attemptFriendlyId: params.attemptFriendlyId, - }); + this._logger.debug("Sending duration wait resume message"); await this.#setPostResumeStatuses(attempt, tx); @@ -114,13 +114,13 @@ export class ResumeAttemptService extends BaseService { const dependentAttempt = attempt.dependencies[0].taskRun.attempts[0]; if (!dependentAttempt) { - logger.error("No dependent attempt", { attemptId: attempt.id }); + this._logger.error("No dependent attempt"); return; } completedAttemptIds = [dependentAttempt.id]; } else { - logger.error("No task dependency", { attemptId: attempt.id }); + this._logger.error("No task dependency"); return; } @@ -134,13 +134,13 @@ export class ResumeAttemptService extends BaseService { const dependentBatchItems = attempt.batchDependencies[0].items; if (!dependentBatchItems) { - logger.error("No dependent batch items", { attemptId: attempt.id }); + this._logger.error("No dependent batch items"); return; } completedAttemptIds = dependentBatchItems.map((item) => item.taskRun.attempts[0]?.id); } else { - logger.error("No batch dependency", { attemptId: attempt.id }); + this._logger.error("No batch dependency"); return; } @@ -161,7 +161,7 @@ export class ResumeAttemptService extends BaseService { tx: PrismaClientOrTransaction ) { if (completedAttemptIds.length === 0) { - logger.error("No completed attempt IDs", { attemptId: attempt.id }); + this._logger.error("No completed attempt IDs"); return; } @@ -184,23 +184,23 @@ export class ResumeAttemptService extends BaseService { }); if (!completedAttempt) { - logger.error("Completed attempt not found", { - attemptId: attempt.id, - completedAttemptId, - }); + this._logger.error("Completed attempt not found", { completedAttemptId }); await marqs?.acknowledgeMessage(attempt.taskRunId); return; } + const logger = this._logger.child({ + completedAttemptId: completedAttempt.id, + completedAttemptFriendlyId: completedAttempt.friendlyId, + completedRunId: completedAttempt.taskRunId, + }); + const completion = await sharedQueueTasks.getCompletionPayloadFromAttempt( completedAttempt.id ); if (!completion) { - logger.error("Failed to get completion payload", { - attemptId: attempt.id, - completedAttemptId, - }); + logger.error("Failed to get completion payload"); await marqs?.acknowledgeMessage(attempt.taskRunId); return; } @@ -212,10 +212,7 @@ export class ResumeAttemptService extends BaseService { ); if (!executionPayload) { - logger.error("Failed to get execution payload", { - attemptId: attempt.id, - completedAttemptId, - }); + logger.error("Failed to get execution payload"); await marqs?.acknowledgeMessage(attempt.taskRunId); return; } From 4c8618d18e304ded1e5f751e07c5b6ebd1710a9b Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 17 Oct 2024 16:40:31 +0100 Subject: [PATCH 08/19] fix for resuming parents of canceled child runs --- apps/webapp/app/v3/handleSocketIo.server.ts | 5 +- .../v3/marqs/sharedQueueConsumer.server.ts | 77 +++++++++++-------- .../app/v3/services/resumeAttempt.server.ts | 7 +- 3 files changed, 55 insertions(+), 34 deletions(-) diff --git a/apps/webapp/app/v3/handleSocketIo.server.ts b/apps/webapp/app/v3/handleSocketIo.server.ts index 065d2bc168..dd12d69bb4 100644 --- a/apps/webapp/app/v3/handleSocketIo.server.ts +++ b/apps/webapp/app/v3/handleSocketIo.server.ts @@ -195,7 +195,10 @@ function createCoordinatorNamespace(io: Server) { const service = new CreateTaskRunAttemptService(); const { attempt } = await service.call(message.runId, environment, false); - const payload = await sharedQueueTasks.getExecutionPayloadFromAttempt(attempt.id, true); + const payload = await sharedQueueTasks.getExecutionPayloadFromAttempt({ + id: attempt.id, + setToExecuting: true, + }); if (!payload) { logger.error("Failed to retrieve payload after attempt creation", message); diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index 5d4bcadacd..12f6ffd68a 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -43,7 +43,12 @@ import { generateJWTTokenForEnvironment } from "~/services/apiAuth.server"; import { EnvironmentVariable } from "../environmentVariables/repository"; import { machinePresetFromConfig } from "../machinePresets.server"; import { env } from "~/env.server"; -import { FINAL_RUN_STATUSES, isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus"; +import { + FINAL_ATTEMPT_STATUSES, + FINAL_RUN_STATUSES, + isFinalAttemptStatus, + isFinalRunStatus, +} from "../taskStatus"; import { getMaxDuration } from "../utils/maxDuration"; const WithTraceContext = z.object({ @@ -729,9 +734,9 @@ export class SharedQueueConsumer { completions.push(completion); - const executionPayload = await this._tasks.getExecutionPayloadFromAttempt( - completedAttempt.id - ); + const executionPayload = await this._tasks.getExecutionPayloadFromAttempt({ + id: completedAttempt.id, + }); if (!executionPayload) { await this.#ackAndDoMoreWork(message.messageId); @@ -968,7 +973,7 @@ class SharedQueueTasks { where: { id, status: { - in: ["COMPLETED", "FAILED"], + in: FINAL_ATTEMPT_STATUSES, }, }, include: { @@ -1014,11 +1019,17 @@ class SharedQueueTasks { } } - async getExecutionPayloadFromAttempt( - id: string, - setToExecuting?: boolean, - isRetrying?: boolean - ): Promise { + async getExecutionPayloadFromAttempt({ + id, + setToExecuting, + isRetrying, + skipStatusChecks, + }: { + id: string; + setToExecuting?: boolean; + isRetrying?: boolean; + skipStatusChecks?: boolean; + }): Promise { const attempt = await prisma.taskRunAttempt.findUnique({ where: { id, @@ -1051,27 +1062,29 @@ class SharedQueueTasks { return; } - switch (attempt.status) { - case "CANCELED": - case "EXECUTING": { - logger.error("Invalid attempt status for execution payload retrieval", { - attemptId: id, - status: attempt.status, - }); - return; + if (!skipStatusChecks) { + switch (attempt.status) { + case "CANCELED": + case "EXECUTING": { + logger.error("Invalid attempt status for execution payload retrieval", { + attemptId: id, + status: attempt.status, + }); + return; + } } - } - switch (attempt.taskRun.status) { - case "CANCELED": - case "EXECUTING": - case "INTERRUPTED": { - logger.error("Invalid run status for execution payload retrieval", { - attemptId: id, - runId: attempt.taskRunId, - status: attempt.taskRun.status, - }); - return; + switch (attempt.taskRun.status) { + case "CANCELED": + case "EXECUTING": + case "INTERRUPTED": { + logger.error("Invalid run status for execution payload retrieval", { + attemptId: id, + runId: attempt.taskRunId, + status: attempt.taskRun.status, + }); + return; + } } } @@ -1222,7 +1235,11 @@ class SharedQueueTasks { return; } - return this.getExecutionPayloadFromAttempt(latestAttempt.id, setToExecuting, isRetrying); + return this.getExecutionPayloadFromAttempt({ + id: latestAttempt.id, + setToExecuting, + isRetrying, + }); } async getLazyAttemptPayload( diff --git a/apps/webapp/app/v3/services/resumeAttempt.server.ts b/apps/webapp/app/v3/services/resumeAttempt.server.ts index a0c5b7fa18..0cba99e472 100644 --- a/apps/webapp/app/v3/services/resumeAttempt.server.ts +++ b/apps/webapp/app/v3/services/resumeAttempt.server.ts @@ -207,9 +207,10 @@ export class ResumeAttemptService extends BaseService { completions.push(completion); - const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt( - completedAttempt.id - ); + const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt({ + id: completedAttempt.id, + skipStatusChecks: true, // already checked when getting the completion + }); if (!executionPayload) { logger.error("Failed to get execution payload"); From 12ad920360c33df71d42ae855ac9bc2c8bfd510a Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 17 Oct 2024 18:05:02 +0100 Subject: [PATCH 09/19] separate SIGTERM from maybe OOM errors --- packages/core/src/v3/errors.ts | 50 ++++++++++++++++++++++---- packages/core/src/v3/schemas/common.ts | 2 ++ 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/packages/core/src/v3/errors.ts b/packages/core/src/v3/errors.ts index f63f48b99f..e06e97a90e 100644 --- a/packages/core/src/v3/errors.ts +++ b/packages/core/src/v3/errors.ts @@ -331,6 +331,14 @@ const prettyInternalErrors: Partial< href: links.docs.machines.home, }, }, + TASK_PROCESS_SIGTERM: { + message: + "Your task exited after receiving SIGTERM but we don't know why. If this keeps happening, please get in touch so we can investigate.", + link: { + name: "Contact us", + href: links.site.contact, + }, + }, }; type EnhanceError = T & { @@ -342,6 +350,14 @@ export function taskRunErrorEnhancer(error: TaskRunError): EnhanceError Date: Thu, 17 Oct 2024 19:15:46 +0100 Subject: [PATCH 10/19] pretty errors can have magic dashboard links --- .../route.tsx | 34 +++++++++++++++---- packages/core/src/v3/errors.ts | 21 +++++++++--- 2 files changed, 44 insertions(+), 11 deletions(-) diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx index 9db878ea6c..b250a88017 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx @@ -1,4 +1,10 @@ -import { CheckIcon, ClockIcon, CloudArrowDownIcon, QueueListIcon } from "@heroicons/react/20/solid"; +import { + CheckIcon, + ClockIcon, + CloudArrowDownIcon, + EnvelopeIcon, + QueueListIcon, +} from "@heroicons/react/20/solid"; import { Link } from "@remix-run/react"; import { LoaderFunctionArgs } from "@remix-run/server-runtime"; import { @@ -13,6 +19,7 @@ import { typedjson, useTypedFetcher } from "remix-typedjson"; import { ExitIcon } from "~/assets/icons/ExitIcon"; import { CodeBlock } from "~/components/code/CodeBlock"; import { EnvironmentLabel } from "~/components/environments/EnvironmentLabel"; +import { Feedback } from "~/components/Feedback"; import { Button, LinkButton } from "~/components/primitives/Buttons"; import { Callout } from "~/components/primitives/Callout"; import { DateTime, DateTimeAccurate } from "~/components/primitives/DateTime"; @@ -963,11 +970,26 @@ function RunError({ error }: { error: TaskRunError }) {
{name} {enhancedError.message && {enhancedError.message}} - {enhancedError.link && ( - - {enhancedError.link.name} - - )} + {enhancedError.link && + (enhancedError.link.magic === "CONTACT_FORM" ? ( + + {enhancedError.link.name} + + } + /> + ) : ( + + {enhancedError.link.name} + + ))} {enhancedError.stackTrace && ( = T & { link?: ErrorLink }; + const prettyInternalErrors: Partial< - Record + Record< + TaskRunInternalError["code"], + { + message: string; + link?: ErrorLink; + } + > > = { TASK_PROCESS_OOM_KILLED: { message: @@ -337,14 +351,11 @@ const prettyInternalErrors: Partial< link: { name: "Contact us", href: links.site.contact, + magic: "CONTACT_FORM", }, }, }; -type EnhanceError = T & { - link?: { name: string; href: string }; -}; - export function taskRunErrorEnhancer(error: TaskRunError): EnhanceError { switch (error.type) { case "BUILT_IN_ERROR": { From 2d84b7cdc41ba9c9e10fbccbddb00219b6486aa0 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 18 Oct 2024 00:34:40 +0100 Subject: [PATCH 11/19] prevent uncancellable checkpoints --- apps/coordinator/src/checkpointer.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/apps/coordinator/src/checkpointer.ts b/apps/coordinator/src/checkpointer.ts index f1b55fa066..bf82a6702c 100644 --- a/apps/coordinator/src/checkpointer.ts +++ b/apps/coordinator/src/checkpointer.ts @@ -436,7 +436,10 @@ export class Checkpointer { this.#logger.error("Error during cleanup", { ...metadata, error }); } - this.#abortControllers.delete(runId); + // Ensure only the current controller is removed + if (this.#abortControllers.get(runId) === controller) { + this.#abortControllers.delete(runId); + } controller.signal.removeEventListener("abort", onAbort); }; From 34d9759760015cc0498e2ed8920795e3ce5c1a1f Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 18 Oct 2024 10:28:47 +0100 Subject: [PATCH 12/19] simplify task run error code enum export --- packages/core/src/v3/schemas/common.ts | 26 ++------------------------ 1 file changed, 2 insertions(+), 24 deletions(-) diff --git a/packages/core/src/v3/schemas/common.ts b/packages/core/src/v3/schemas/common.ts index f05ea70675..6c56be3526 100644 --- a/packages/core/src/v3/schemas/common.ts +++ b/packages/core/src/v3/schemas/common.ts @@ -77,30 +77,6 @@ export const TaskRunStringError = z.object({ export type TaskRunStringError = z.infer; -export const TaskRunErrorCodes = { - COULD_NOT_FIND_EXECUTOR: "COULD_NOT_FIND_EXECUTOR", - COULD_NOT_FIND_TASK: "COULD_NOT_FIND_TASK", - COULD_NOT_IMPORT_TASK: "COULD_NOT_IMPORT_TASK", - CONFIGURED_INCORRECTLY: "CONFIGURED_INCORRECTLY", - TASK_ALREADY_RUNNING: "TASK_ALREADY_RUNNING", - TASK_EXECUTION_FAILED: "TASK_EXECUTION_FAILED", - TASK_EXECUTION_ABORTED: "TASK_EXECUTION_ABORTED", - TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE: "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE", - TASK_PROCESS_SIGKILL_TIMEOUT: "TASK_PROCESS_SIGKILL_TIMEOUT", - TASK_PROCESS_SIGTERM: "TASK_PROCESS_SIGTERM", - TASK_PROCESS_OOM_KILLED: "TASK_PROCESS_OOM_KILLED", - TASK_PROCESS_MAYBE_OOM_KILLED: "TASK_PROCESS_MAYBE_OOM_KILLED", - TASK_RUN_CANCELLED: "TASK_RUN_CANCELLED", - TASK_OUTPUT_ERROR: "TASK_OUTPUT_ERROR", - HANDLE_ERROR_ERROR: "HANDLE_ERROR_ERROR", - GRACEFUL_EXIT_TIMEOUT: "GRACEFUL_EXIT_TIMEOUT", - TASK_RUN_CRASHED: "TASK_RUN_CRASHED", - MAX_DURATION_EXCEEDED: "MAX_DURATION_EXCEEDED", - DISK_SPACE_EXCEEDED: "DISK_SPACE_EXCEEDED", - POD_EVICTED: "POD_EVICTED", - POD_UNKNOWN_ERROR: "POD_UNKNOWN_ERROR", -} as const; - export const TaskRunInternalError = z.object({ type: z.literal("INTERNAL_ERROR"), code: z.enum([ @@ -133,6 +109,8 @@ export const TaskRunInternalError = z.object({ export type TaskRunInternalError = z.infer; +export const TaskRunErrorCodes = TaskRunInternalError.shape.code.enum; + export const TaskRunError = z.discriminatedUnion("type", [ TaskRunBuiltInError, TaskRunCustomErrorObject, From 89ec5c8bfd4098b55312960f6f21d93a31a0f561 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 18 Oct 2024 12:07:19 +0100 Subject: [PATCH 13/19] grab the last, not the first child run --- apps/webapp/app/v3/services/createCheckpoint.server.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/webapp/app/v3/services/createCheckpoint.server.ts b/apps/webapp/app/v3/services/createCheckpoint.server.ts index 2d7d627a64..f0671abda3 100644 --- a/apps/webapp/app/v3/services/createCheckpoint.server.ts +++ b/apps/webapp/app/v3/services/createCheckpoint.server.ts @@ -39,7 +39,7 @@ export class CreateCheckpointService extends BaseService { include: { childRuns: { orderBy: { - createdAt: "asc", + createdAt: "desc", }, take: 1, }, From 5c262fdbe3a45bf380e592bc8c3103f15bed67d4 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 18 Oct 2024 16:04:41 +0100 Subject: [PATCH 14/19] Revert "prevent creating checkpoints for outdated batch waits" This reverts commit f2b5c2ac42d52d3ab103ccc4b8e1f57c15b4e6e6. --- .../v3/services/createCheckpoint.server.ts | 29 ------------------- 1 file changed, 29 deletions(-) diff --git a/apps/webapp/app/v3/services/createCheckpoint.server.ts b/apps/webapp/app/v3/services/createCheckpoint.server.ts index f0671abda3..2696054f18 100644 --- a/apps/webapp/app/v3/services/createCheckpoint.server.ts +++ b/apps/webapp/app/v3/services/createCheckpoint.server.ts @@ -122,8 +122,6 @@ export class CreateCheckpointService extends BaseService { logger.error("CreateCheckpointService: Checkpoint not for most recent child run", { attemptId: attempt.id, runId: attempt.taskRunId, - lastChild: lastChildRun.friendlyId, - checkpointFor: reason.friendlyId, params, }); @@ -136,33 +134,6 @@ export class CreateCheckpointService extends BaseService { break; } case "WAIT_FOR_BATCH": { - const lastChildRun = attempt.taskRun.childRuns[0]; - - if (!lastChildRun) { - logger.warn("CreateCheckpointService: No child runs, creating checkpoint regardless", { - attemptId: attempt.id, - runId: attempt.taskRunId, - params, - }); - - break; - } - - if (!reason.runFriendlyIds.includes(lastChildRun.friendlyId)) { - logger.error("CreateCheckpointService: Checkpoint not for most recent batch", { - attemptId: attempt.id, - runId: attempt.taskRunId, - lastChild: lastChildRun.friendlyId, - checkpointFor: reason.runFriendlyIds, - params, - }); - - return { - success: false, - keepRunAlive: true, - }; - } - break; } default: { From e6afbb4d8bd606bbfec1b875e8d2734e5257fa57 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 18 Oct 2024 16:05:33 +0100 Subject: [PATCH 15/19] Revert "grab the last, not the first child run" This reverts commit 89ec5c8bfd4098b55312960f6f21d93a31a0f561. --- apps/webapp/app/v3/services/createCheckpoint.server.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/webapp/app/v3/services/createCheckpoint.server.ts b/apps/webapp/app/v3/services/createCheckpoint.server.ts index 2696054f18..b95843c065 100644 --- a/apps/webapp/app/v3/services/createCheckpoint.server.ts +++ b/apps/webapp/app/v3/services/createCheckpoint.server.ts @@ -39,7 +39,7 @@ export class CreateCheckpointService extends BaseService { include: { childRuns: { orderBy: { - createdAt: "desc", + createdAt: "asc", }, take: 1, }, From 40d80f9bfd41beb4e97d143309725280395fc13a Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 18 Oct 2024 16:05:42 +0100 Subject: [PATCH 16/19] Revert "prevent creating checkpoints for outdated task waits" This reverts commit 11066b4e740dd0e67b69db7f507b20c4895328b6. --- .../v3/services/createCheckpoint.server.ts | 52 ++----------------- 1 file changed, 3 insertions(+), 49 deletions(-) diff --git a/apps/webapp/app/v3/services/createCheckpoint.server.ts b/apps/webapp/app/v3/services/createCheckpoint.server.ts index b95843c065..7290424248 100644 --- a/apps/webapp/app/v3/services/createCheckpoint.server.ts +++ b/apps/webapp/app/v3/services/createCheckpoint.server.ts @@ -35,16 +35,7 @@ export class CreateCheckpointService extends BaseService { friendlyId: params.attemptFriendlyId, }, include: { - taskRun: { - include: { - childRuns: { - orderBy: { - createdAt: "asc", - }, - take: 1, - }, - }, - }, + taskRun: true, backgroundWorker: { select: { id: true, @@ -102,45 +93,6 @@ export class CreateCheckpointService extends BaseService { }; } - const { reason } = params; - - switch (reason.type) { - case "WAIT_FOR_TASK": { - const lastChildRun = attempt.taskRun.childRuns[0]; - - if (!lastChildRun) { - logger.warn("CreateCheckpointService: No child runs, creating checkpoint regardless", { - attemptId: attempt.id, - runId: attempt.taskRunId, - params, - }); - - break; - } - - if (lastChildRun.friendlyId !== reason.friendlyId) { - logger.error("CreateCheckpointService: Checkpoint not for most recent child run", { - attemptId: attempt.id, - runId: attempt.taskRunId, - params, - }); - - return { - success: false, - keepRunAlive: true, - }; - } - - break; - } - case "WAIT_FOR_BATCH": { - break; - } - default: { - break; - } - } - //sleep to test slow checkpoints // await new Promise((resolve) => setTimeout(resolve, 60_000)); @@ -176,6 +128,8 @@ export class CreateCheckpointService extends BaseService { }, }); + const { reason } = params; + let checkpointEvent: CheckpointRestoreEvent | undefined; switch (reason.type) { From 59d375b07d30b3ccc4a3e034ff784c4be8b1f383 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 18 Oct 2024 16:24:18 +0100 Subject: [PATCH 17/19] more logs for resume message handling --- .../v3/marqs/sharedQueueConsumer.server.ts | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index 12f6ffd68a..19628db616 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -826,8 +826,22 @@ export class SharedQueueConsumer { return; } + logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK restored checkpoint", { + queueMessage: message.data, + messageId: message.messageId, + checkpoint, + }); + this.#doMoreWork(); return; + } else { + logger.debug( + "RESUME_AFTER_DEPENDENCY_WITH_ACK run is frozen without last checkpoint event", + { + queueMessage: message.data, + messageId: message.messageId, + } + ); } } catch (e) { if (e instanceof Error) { @@ -847,6 +861,11 @@ export class SharedQueueConsumer { } } + logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK retrying", { + queueMessage: message.data, + messageId: message.messageId, + }); + await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000); return; } catch (e) { From 3604d830b9cdb6e1ea1e95bf483041bbd95ad988 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 18 Oct 2024 16:43:46 +0100 Subject: [PATCH 18/19] add magic error link comment --- packages/core/src/v3/errors.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/core/src/v3/errors.ts b/packages/core/src/v3/errors.ts index 90eb13bd40..bc6fc20e47 100644 --- a/packages/core/src/v3/errors.ts +++ b/packages/core/src/v3/errors.ts @@ -315,6 +315,7 @@ export class GracefulExitTimeoutError extends Error { type ErrorLink = { name: string; href: string; + // This allows us to easily add more complex logic on the frontend, e.g. display a button to open a contact form modal magic?: "CONTACT_FORM"; }; From cdbf5c6233c0491c7f1572fdb883418ad54ba64b Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 18 Oct 2024 16:45:36 +0100 Subject: [PATCH 19/19] add changeset --- .changeset/many-plants-destroy.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/many-plants-destroy.md diff --git a/.changeset/many-plants-destroy.md b/.changeset/many-plants-destroy.md new file mode 100644 index 0000000000..a59a5f385c --- /dev/null +++ b/.changeset/many-plants-destroy.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/core": patch +--- + +SIGTERM detection and prettier errors