From 938c5a1fcd9e62e86a670edb57c4571b0b619b70 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 17 Feb 2025 15:50:31 +0000 Subject: [PATCH 1/6] add ability to delay checkpoints --- apps/coordinator/src/checkpointer.ts | 56 +++++++++++++++++++--------- 1 file changed, 38 insertions(+), 18 deletions(-) diff --git a/apps/coordinator/src/checkpointer.ts b/apps/coordinator/src/checkpointer.ts index c52060e9c6..58aaa7d696 100644 --- a/apps/coordinator/src/checkpointer.ts +++ b/apps/coordinator/src/checkpointer.ts @@ -89,7 +89,7 @@ export class Checkpointer { #logger = new SimpleStructuredLogger("checkpointer"); #abortControllers = new Map(); #failedCheckpoints = new Map(); - #waitingForRetry = new Set(); + #waitingToCheckpoint = new Set(); private registryHost: string; private registryNamespace: string; @@ -189,7 +189,10 @@ export class Checkpointer { } } - async checkpointAndPush(opts: CheckpointAndPushOptions): Promise { + async checkpointAndPush( + opts: CheckpointAndPushOptions, + delayMs?: number + ): Promise { const start = performance.now(); this.#logger.log(`checkpointAndPush() start`, { start, opts }); @@ -203,7 +206,7 @@ export class Checkpointer { } try { - const result = await this.#checkpointAndPushWithBackoff(opts); + const result = await this.#checkpointAndPushWithBackoff(opts, delayMs); const end = performance.now(); this.#logger.log(`checkpointAndPush() end`, { @@ -226,8 +229,8 @@ export class Checkpointer { } } - isCheckpointing(runId: string) { - return this.#abortControllers.has(runId) || this.#waitingForRetry.has(runId); + #isCheckpointing(runId: string) { + return this.#abortControllers.has(runId) || this.#waitingToCheckpoint.has(runId); } cancelCheckpoint(runId: string): boolean { @@ -238,8 +241,8 @@ export class Checkpointer { return true; } - if (this.#waitingForRetry.has(runId)) { - this.#waitingForRetry.delete(runId); + if (this.#waitingToCheckpoint.has(runId)) { + this.#waitingToCheckpoint.delete(runId); return true; } @@ -261,13 +264,30 @@ export class Checkpointer { return true; } - async #checkpointAndPushWithBackoff({ - runId, - leaveRunning = true, // This mirrors kubernetes behaviour more accurately - projectRef, - deploymentVersion, - attemptNumber, - }: CheckpointAndPushOptions): Promise { + async #checkpointAndPushWithBackoff( + { + runId, + leaveRunning = true, // This mirrors kubernetes behaviour more accurately + projectRef, + deploymentVersion, + attemptNumber, + }: CheckpointAndPushOptions, + delayMs?: number + ): Promise { + if (delayMs && delayMs > 0) { + this.#logger.log("Delaying checkpoint", { runId, delayMs }); + + this.#waitingToCheckpoint.add(runId); + await setTimeout(delayMs); + + if (!this.#waitingToCheckpoint.has(runId)) { + this.#logger.log("Checkpoint canceled during initial delay", { runId }); + return { success: false, reason: "CANCELED" }; + } else { + this.#waitingToCheckpoint.delete(runId); + } + } + this.#logger.log("Checkpointing with backoff", { runId, leaveRunning, @@ -290,14 +310,14 @@ export class Checkpointer { delay, }); - this.#waitingForRetry.add(runId); + this.#waitingToCheckpoint.add(runId); await setTimeout(delay.milliseconds); - if (!this.#waitingForRetry.has(runId)) { + if (!this.#waitingToCheckpoint.has(runId)) { this.#logger.log("Checkpoint canceled while waiting for retry", { runId }); return { success: false, reason: "CANCELED" }; } else { - this.#waitingForRetry.delete(runId); + this.#waitingToCheckpoint.delete(runId); } } @@ -386,7 +406,7 @@ export class Checkpointer { return { success: false, reason: "NO_SUPPORT" }; } - if (this.isCheckpointing(runId)) { + if (this.#isCheckpointing(runId)) { this.#logger.error("Checkpoint procedure already in progress", { options }); return { success: false, reason: "IN_PROGRESS" }; } From 9a101b3bb99133656ffd8650dfe2cb179682c5cc Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 17 Feb 2025 15:51:28 +0000 Subject: [PATCH 2/6] optional checkpoint delays for dependency waits --- apps/coordinator/src/index.ts | 41 +++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/apps/coordinator/src/index.ts b/apps/coordinator/src/index.ts index aa5a6062b7..4268ced9f9 100644 --- a/apps/coordinator/src/index.ts +++ b/apps/coordinator/src/index.ts @@ -35,6 +35,11 @@ const TASK_RUN_COMPLETED_WITH_ACK_TIMEOUT_MS = const TASK_RUN_COMPLETED_WITH_ACK_MAX_RETRIES = parseInt(process.env.TASK_RUN_COMPLETED_WITH_ACK_MAX_RETRIES || "") || 7; +const WAIT_FOR_TASK_CHECKPOINT_DELAY_MS = + parseInt(process.env.WAIT_FOR_TASK_CHECKPOINT_DELAY_MS || "") || 0; +const WAIT_FOR_BATCH_CHECKPOINT_DELAY_MS = + parseInt(process.env.WAIT_FOR_BATCH_CHECKPOINT_DELAY_MS || "") || 0; + const logger = new SimpleStructuredLogger("coordinator", undefined, { nodeName: NODE_NAME }); const chaosMonkey = new ChaosMonkey( !!process.env.CHAOS_MONKEY_ENABLED, @@ -143,6 +148,7 @@ class TaskCoordinator { authToken: PLATFORM_SECRET, logHandlerPayloads: false, handlers: { + // This is used by resumeAttempt RESUME_AFTER_DEPENDENCY: async (message) => { const log = platformLogger.child({ eventName: "RESUME_AFTER_DEPENDENCY", @@ -168,11 +174,12 @@ class TaskCoordinator { await chaosMonkey.call(); - // In case the task resumed faster than we could checkpoint + // In case the task resumes before the checkpoint is created this.#cancelCheckpoint(message.runId); taskSocket.emit("RESUME_AFTER_DEPENDENCY", message); }, + // This is used by sharedQueueConsumer RESUME_AFTER_DEPENDENCY_WITH_ACK: async (message) => { const log = platformLogger.child({ eventName: "RESUME_AFTER_DEPENDENCY_WITH_ACK", @@ -218,7 +225,7 @@ class TaskCoordinator { await chaosMonkey.call(); - // In case the task resumed faster than we could checkpoint + // In case the task resumes before the checkpoint is created this.#cancelCheckpoint(message.runId); taskSocket.emit("RESUME_AFTER_DEPENDENCY", message); @@ -1096,12 +1103,15 @@ class TaskCoordinator { } } - const checkpoint = await this.#checkpointer.checkpointAndPush({ - runId: socket.data.runId, - projectRef: socket.data.projectRef, - deploymentVersion: socket.data.deploymentVersion, - attemptNumber: getAttemptNumber(), - }); + const checkpoint = await this.#checkpointer.checkpointAndPush( + { + runId: socket.data.runId, + projectRef: socket.data.projectRef, + deploymentVersion: socket.data.deploymentVersion, + attemptNumber: getAttemptNumber(), + }, + WAIT_FOR_TASK_CHECKPOINT_DELAY_MS + ); if (!checkpoint) { log.error("Failed to checkpoint"); @@ -1189,12 +1199,15 @@ class TaskCoordinator { } } - const checkpoint = await this.#checkpointer.checkpointAndPush({ - runId: socket.data.runId, - projectRef: socket.data.projectRef, - deploymentVersion: socket.data.deploymentVersion, - attemptNumber: getAttemptNumber(), - }); + const checkpoint = await this.#checkpointer.checkpointAndPush( + { + runId: socket.data.runId, + projectRef: socket.data.projectRef, + deploymentVersion: socket.data.deploymentVersion, + attemptNumber: getAttemptNumber(), + }, + WAIT_FOR_BATCH_CHECKPOINT_DELAY_MS + ); if (!checkpoint) { log.error("Failed to checkpoint"); From ad9cf281a54138c87bf5fa3a7e0f90a02f59910b Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 18 Feb 2025 11:48:29 +0000 Subject: [PATCH 3/6] prevent checkpoint creation for resumed batches --- .../v3/services/createCheckpoint.server.ts | 51 ++++++++++++++++++- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/apps/webapp/app/v3/services/createCheckpoint.server.ts b/apps/webapp/app/v3/services/createCheckpoint.server.ts index bbd8618898..2f0e755c5f 100644 --- a/apps/webapp/app/v3/services/createCheckpoint.server.ts +++ b/apps/webapp/app/v3/services/createCheckpoint.server.ts @@ -93,6 +93,55 @@ export class CreateCheckpointService extends BaseService { }; } + const { reason } = params; + + // Check if we should accept this checkpoint + switch (reason.type) { + case "MANUAL": + // Always accept manual checkpoints + break; + case "WAIT_FOR_DURATION": + // Always accept duration checkpoints + break; + case "WAIT_FOR_TASK": { + // TODO + break; + } + case "WAIT_FOR_BATCH": { + const batchRun = await this._prisma.batchTaskRun.findFirst({ + where: { + friendlyId: reason.batchFriendlyId, + }, + }); + + if (!batchRun) { + logger.error("CreateCheckpointService: Batch not found", { + batchFriendlyId: reason.batchFriendlyId, + params, + }); + + return { + success: false, + keepRunAlive: true, + }; + } + + if (batchRun.batchVersion === "v3" && batchRun.resumedAt) { + logger.error("CreateCheckpointService: Batch already resumed", { + batchRun, + params, + }); + + return { + success: false, + keepRunAlive: true, + }; + } + + break; + } + } + //sleep to test slow checkpoints // Sleep a random value between 4 and 30 seconds // await new Promise((resolve) => { @@ -146,8 +195,6 @@ export class CreateCheckpointService extends BaseService { }, }); - const { reason } = params; - let checkpointEvent: CheckpointRestoreEvent | undefined; switch (reason.type) { From 7c511863db6ccd0003764172f0b971624529d438 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 18 Feb 2025 11:49:54 +0000 Subject: [PATCH 4/6] unpause after checkpoint rejected by platform --- apps/coordinator/src/checkpointer.ts | 10 +++++++ apps/coordinator/src/index.ts | 42 ++++++++++++++++++++++++---- 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/apps/coordinator/src/checkpointer.ts b/apps/coordinator/src/checkpointer.ts index 58aaa7d696..a1ad2192c0 100644 --- a/apps/coordinator/src/checkpointer.ts +++ b/apps/coordinator/src/checkpointer.ts @@ -601,6 +601,16 @@ export class Checkpointer { } } + async unpause(runId: string, attemptNumber?: number): Promise { + try { + const containterNameWithAttempt = this.#getRunContainerName(runId, attemptNumber); + const exec = new Exec({ logger: this.#logger }); + await exec.x("docker", ["unpause", containterNameWithAttempt]); + } catch (error) { + this.#logger.error("[Docker] Error during unpause", { runId, attemptNumber, error }); + } + } + async #createDockerCheckpoint( abortSignal: AbortSignal, runId: string, diff --git a/apps/coordinator/src/index.ts b/apps/coordinator/src/index.ts index 4268ced9f9..65f9520826 100644 --- a/apps/coordinator/src/index.ts +++ b/apps/coordinator/src/index.ts @@ -1015,11 +1015,14 @@ class TaskCoordinator { return; } + const runId = socket.data.runId; + const attemptNumber = getAttemptNumber(); + const checkpoint = await this.#checkpointer.checkpointAndPush({ - runId: socket.data.runId, + runId, projectRef: socket.data.projectRef, deploymentVersion: socket.data.deploymentVersion, - attemptNumber: getAttemptNumber(), + attemptNumber, }); if (!checkpoint) { @@ -1045,6 +1048,13 @@ class TaskCoordinator { if (ack?.keepRunAlive) { log.log("keeping run alive after duration checkpoint"); + + if (checkpoint.docker && willSimulate) { + // The container is still paused so we need to unpause it + log.log("unpausing container after duration checkpoint"); + this.#checkpointer.unpause(runId, attemptNumber); + } + return; } @@ -1103,12 +1113,15 @@ class TaskCoordinator { } } + const runId = socket.data.runId; + const attemptNumber = getAttemptNumber(); + const checkpoint = await this.#checkpointer.checkpointAndPush( { - runId: socket.data.runId, + runId, projectRef: socket.data.projectRef, deploymentVersion: socket.data.deploymentVersion, - attemptNumber: getAttemptNumber(), + attemptNumber, }, WAIT_FOR_TASK_CHECKPOINT_DELAY_MS ); @@ -1141,6 +1154,13 @@ class TaskCoordinator { if (ack?.keepRunAlive) { socket.data.requiresCheckpointResumeWithMessage = undefined; log.log("keeping run alive after task checkpoint"); + + if (checkpoint.docker && willSimulate) { + // The container is still paused so we need to unpause it + log.log("unpausing container after duration checkpoint"); + this.#checkpointer.unpause(runId, attemptNumber); + } + return; } @@ -1199,12 +1219,15 @@ class TaskCoordinator { } } + const runId = socket.data.runId; + const attemptNumber = getAttemptNumber(); + const checkpoint = await this.#checkpointer.checkpointAndPush( { - runId: socket.data.runId, + runId, projectRef: socket.data.projectRef, deploymentVersion: socket.data.deploymentVersion, - attemptNumber: getAttemptNumber(), + attemptNumber, }, WAIT_FOR_BATCH_CHECKPOINT_DELAY_MS ); @@ -1238,6 +1261,13 @@ class TaskCoordinator { if (ack?.keepRunAlive) { socket.data.requiresCheckpointResumeWithMessage = undefined; log.log("keeping run alive after batch checkpoint"); + + if (checkpoint.docker && willSimulate) { + // The container is still paused so we need to unpause it + log.log("unpausing container after batch checkpoint"); + this.#checkpointer.unpause(runId, attemptNumber); + } + return; } From 1c33babae0028b528bc0be11c5ca63d044d68bc7 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 18 Feb 2025 13:33:52 +0000 Subject: [PATCH 5/6] prevent checkpoint creation for resumed task waits --- .../v3/services/createCheckpoint.server.ts | 56 +++++++++++++++++-- .../services/resumeTaskDependency.server.ts | 49 ++++++++++++++++ .../migration.sql | 2 + .../database/prisma/schema.prisma | 1 + 4 files changed, 102 insertions(+), 6 deletions(-) create mode 100644 internal-packages/database/prisma/migrations/20250218124428_add_resumed_at_to_task_run_dependency/migration.sql diff --git a/apps/webapp/app/v3/services/createCheckpoint.server.ts b/apps/webapp/app/v3/services/createCheckpoint.server.ts index 2f0e755c5f..e5a49d1fe4 100644 --- a/apps/webapp/app/v3/services/createCheckpoint.server.ts +++ b/apps/webapp/app/v3/services/createCheckpoint.server.ts @@ -97,14 +97,52 @@ export class CreateCheckpointService extends BaseService { // Check if we should accept this checkpoint switch (reason.type) { - case "MANUAL": + case "MANUAL": { // Always accept manual checkpoints break; - case "WAIT_FOR_DURATION": + } + case "WAIT_FOR_DURATION": { // Always accept duration checkpoints break; + } case "WAIT_FOR_TASK": { - // TODO + const childRun = await this._prisma.taskRun.findFirst({ + where: { + friendlyId: reason.friendlyId, + }, + select: { + dependency: { + select: { + resumedAt: true, + }, + }, + }, + }); + + if (!childRun) { + logger.error("CreateCheckpointService: Pre-check - WAIT_FOR_TASK child run not found", { + friendlyId: reason.friendlyId, + params, + }); + + return { + success: false, + keepRunAlive: false, + }; + } + + if (childRun.dependency?.resumedAt) { + logger.error("CreateCheckpointService: Child run already resumed", { + childRun, + params, + }); + + return { + success: false, + keepRunAlive: true, + }; + } + break; } case "WAIT_FOR_BATCH": { @@ -112,21 +150,24 @@ export class CreateCheckpointService extends BaseService { where: { friendlyId: reason.batchFriendlyId, }, + select: { + resumedAt: true, + }, }); if (!batchRun) { - logger.error("CreateCheckpointService: Batch not found", { + logger.error("CreateCheckpointService: Pre-check - Batch not found", { batchFriendlyId: reason.batchFriendlyId, params, }); return { success: false, - keepRunAlive: true, + keepRunAlive: false, }; } - if (batchRun.batchVersion === "v3" && batchRun.resumedAt) { + if (batchRun.resumedAt) { logger.error("CreateCheckpointService: Batch already resumed", { batchRun, params, @@ -140,6 +181,9 @@ export class CreateCheckpointService extends BaseService { break; } + default: { + break; + } } //sleep to test slow checkpoints diff --git a/apps/webapp/app/v3/services/resumeTaskDependency.server.ts b/apps/webapp/app/v3/services/resumeTaskDependency.server.ts index bdbece9778..f3e2fbc558 100644 --- a/apps/webapp/app/v3/services/resumeTaskDependency.server.ts +++ b/apps/webapp/app/v3/services/resumeTaskDependency.server.ts @@ -3,6 +3,7 @@ import { workerQueue } from "~/services/worker.server"; import { marqs } from "~/v3/marqs/index.server"; import { BaseService } from "./baseService.server"; import { logger } from "~/services/logger.server"; +import { TaskRunDependency } from "@trigger.dev/database"; export class ResumeTaskDependencyService extends BaseService { public async call(dependencyId: string, sourceTaskAttemptId: string) { @@ -49,6 +50,21 @@ export class ResumeTaskDependencyService extends BaseService { runId: dependentRun.id, } ); + + const wasUpdated = await this.#setDependencyToResumedOnce(dependency); + + if (!wasUpdated) { + logger.debug("Task dependency resume: Attempt with checkpoint was already resumed", { + attemptId: dependency.id, + dependentAttempt: dependency.dependentAttempt, + checkpointEventId: dependency.checkpointEventId, + hasCheckpointEvent: !!dependency.checkpointEventId, + runId: dependentRun.id, + }); + return; + } + + await marqs?.enqueueMessage( dependency.taskRun.runtimeEnvironment, dependentRun.queue, @@ -85,6 +101,19 @@ export class ResumeTaskDependencyService extends BaseService { return; } + const wasUpdated = await this.#setDependencyToResumedOnce(dependency); + + if (!wasUpdated) { + logger.debug("Task dependency resume: Attempt without checkpoint was already resumed", { + attemptId: dependency.id, + dependentAttempt: dependency.dependentAttempt, + checkpointEventId: dependency.checkpointEventId, + hasCheckpointEvent: !!dependency.checkpointEventId, + runId: dependentRun.id, + }); + return; + } + await marqs?.replaceMessage( dependentRun.id, { @@ -102,6 +131,26 @@ export class ResumeTaskDependencyService extends BaseService { } } + async #setDependencyToResumedOnce(dependency: TaskRunDependency) { + const result = await this._prisma.taskRunDependency.updateMany({ + where: { + id: dependency.id, + resumedAt: null, + }, + data: { + resumedAt: new Date(), + }, + }); + + // Check if any records were updated + if (result.count > 0) { + // The status was changed, so we return true + return true; + } else { + return false; + } + } + static async enqueue( dependencyId: string, sourceTaskAttemptId: string, diff --git a/internal-packages/database/prisma/migrations/20250218124428_add_resumed_at_to_task_run_dependency/migration.sql b/internal-packages/database/prisma/migrations/20250218124428_add_resumed_at_to_task_run_dependency/migration.sql new file mode 100644 index 0000000000..032ff92fd4 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20250218124428_add_resumed_at_to_task_run_dependency/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "TaskRunDependency" ADD COLUMN "resumedAt" TIMESTAMP(3); diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index d676484d9b..e724ed24b9 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -1915,6 +1915,7 @@ model TaskRunDependency { createdAt DateTime @default(now()) updatedAt DateTime @updatedAt + resumedAt DateTime? @@index([dependentAttemptId]) @@index([dependentBatchRunId]) From 73bb2ddfb2f956c269e03cd6db7a8997f653fc21 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Wed, 19 Feb 2025 16:28:34 +0000 Subject: [PATCH 6/6] fix checkpoint cancellation --- apps/coordinator/src/checkpointer.ts | 225 ++++++++++++++------------- apps/coordinator/src/index.ts | 42 +++-- 2 files changed, 145 insertions(+), 122 deletions(-) diff --git a/apps/coordinator/src/checkpointer.ts b/apps/coordinator/src/checkpointer.ts index a1ad2192c0..269bf6d421 100644 --- a/apps/coordinator/src/checkpointer.ts +++ b/apps/coordinator/src/checkpointer.ts @@ -27,7 +27,7 @@ type CheckpointAndPushResult = | { success: true; checkpoint: CheckpointData } | { success: false; - reason?: "CANCELED" | "DISABLED" | "ERROR" | "IN_PROGRESS" | "NO_SUPPORT" | "SKIP_RETRYING"; + reason?: "CANCELED" | "ERROR" | "SKIP_RETRYING"; }; type CheckpointData = { @@ -87,9 +87,14 @@ export class Checkpointer { #dockerMode: boolean; #logger = new SimpleStructuredLogger("checkpointer"); - #abortControllers = new Map(); + #failedCheckpoints = new Map(); - #waitingToCheckpoint = new Set(); + + // Indexed by run ID + #runAbortControllers = new Map< + string, + { signal: AbortSignal; abort: AbortController["abort"] } + >(); private registryHost: string; private registryNamespace: string; @@ -196,25 +201,73 @@ export class Checkpointer { const start = performance.now(); this.#logger.log(`checkpointAndPush() start`, { start, opts }); - let interval: NodeJS.Timer | undefined; + const { runId } = opts; + let interval: NodeJS.Timer | undefined; if (opts.shouldHeartbeat) { interval = setInterval(() => { - this.#logger.log("Sending heartbeat", { runId: opts.runId }); - this.opts.heartbeat(opts.runId); + this.#logger.log("Sending heartbeat", { runId }); + this.opts.heartbeat(runId); }, 20_000); } + const controller = new AbortController(); + const signal = controller.signal; + const abort = controller.abort.bind(controller); + + const onAbort = () => { + this.#logger.error("Checkpoint aborted", { runId, options: opts }); + }; + + signal.addEventListener("abort", onAbort, { once: true }); + + const removeCurrentAbortController = () => { + const controller = this.#runAbortControllers.get(runId); + + // Ensure only the current controller is removed + if (controller && controller.signal === signal) { + this.#runAbortControllers.delete(runId); + } + + // Remove the abort listener in case it hasn't fired + signal.removeEventListener("abort", onAbort); + }; + + if (!this.#dockerMode && !this.#canCheckpoint) { + this.#logger.error("No checkpoint support. Simulation requires docker."); + this.#failCheckpoint(runId, "NO_SUPPORT"); + return; + } + + if (this.#isRunCheckpointing(runId)) { + this.#logger.error("Checkpoint procedure already in progress", { options: opts }); + this.#failCheckpoint(runId, "IN_PROGRESS"); + return; + } + + // This is a new checkpoint, clear any last failure for this run + this.#clearFailedCheckpoint(runId); + + if (this.disableCheckpointSupport) { + this.#logger.error("Checkpoint support disabled", { options: opts }); + this.#failCheckpoint(runId, "DISABLED"); + return; + } + + this.#runAbortControllers.set(runId, { signal, abort }); + try { - const result = await this.#checkpointAndPushWithBackoff(opts, delayMs); + const result = await this.#checkpointAndPushWithBackoff(opts, { delayMs, signal }); const end = performance.now(); this.#logger.log(`checkpointAndPush() end`, { start, end, diff: end - start, + diffWithoutDelay: end - start - (delayMs ?? 0), opts, success: result.success, + delayMs, }); if (!result.success) { @@ -226,40 +279,41 @@ export class Checkpointer { if (opts.shouldHeartbeat) { clearInterval(interval); } + removeCurrentAbortController(); } } - #isCheckpointing(runId: string) { - return this.#abortControllers.has(runId) || this.#waitingToCheckpoint.has(runId); + #isRunCheckpointing(runId: string) { + return this.#runAbortControllers.has(runId); } - cancelCheckpoint(runId: string): boolean { + cancelAllCheckpointsForRun(runId: string): boolean { + this.#logger.log("cancelAllCheckpointsForRun: call", { runId }); + // If the last checkpoint failed, pretend we canceled it // This ensures tasks don't wait for external resume messages to continue if (this.#hasFailedCheckpoint(runId)) { + this.#logger.log("cancelAllCheckpointsForRun: hasFailedCheckpoint", { runId }); this.#clearFailedCheckpoint(runId); return true; } - if (this.#waitingToCheckpoint.has(runId)) { - this.#waitingToCheckpoint.delete(runId); - return true; - } - - const controller = this.#abortControllers.get(runId); + const controller = this.#runAbortControllers.get(runId); if (!controller) { - this.#logger.debug("Nothing to cancel", { runId }); + this.#logger.debug("cancelAllCheckpointsForRun: no abort controller", { runId }); return false; } - if (controller.signal.aborted) { - this.#logger.debug("Controller already aborted", { runId }); + const { abort, signal } = controller; + + if (signal.aborted) { + this.#logger.debug("cancelAllCheckpointsForRun: signal already aborted", { runId }); return false; } - controller.abort("cancelCheckpoint()"); - this.#abortControllers.delete(runId); + abort("cancelCheckpoint()"); + this.#runAbortControllers.delete(runId); return true; } @@ -272,19 +326,16 @@ export class Checkpointer { deploymentVersion, attemptNumber, }: CheckpointAndPushOptions, - delayMs?: number + { delayMs, signal }: { delayMs?: number; signal: AbortSignal } ): Promise { if (delayMs && delayMs > 0) { this.#logger.log("Delaying checkpoint", { runId, delayMs }); - this.#waitingToCheckpoint.add(runId); - await setTimeout(delayMs); - - if (!this.#waitingToCheckpoint.has(runId)) { + try { + await setTimeout(delayMs, undefined, { signal }); + } catch (error) { this.#logger.log("Checkpoint canceled during initial delay", { runId }); return { success: false, reason: "CANCELED" }; - } else { - this.#waitingToCheckpoint.delete(runId); } } @@ -310,24 +361,24 @@ export class Checkpointer { delay, }); - this.#waitingToCheckpoint.add(runId); - await setTimeout(delay.milliseconds); - - if (!this.#waitingToCheckpoint.has(runId)) { - this.#logger.log("Checkpoint canceled while waiting for retry", { runId }); + try { + await setTimeout(delay.milliseconds, undefined, { signal }); + } catch (error) { + this.#logger.log("Checkpoint canceled during retry delay", { runId }); return { success: false, reason: "CANCELED" }; - } else { - this.#waitingToCheckpoint.delete(runId); } } - const result = await this.#checkpointAndPush({ - runId, - leaveRunning, - projectRef, - deploymentVersion, - attemptNumber, - }); + const result = await this.#checkpointAndPush( + { + runId, + leaveRunning, + projectRef, + deploymentVersion, + attemptNumber, + }, + { signal } + ); if (result.success) { return result; @@ -339,24 +390,6 @@ export class Checkpointer { return result; } - if (result.reason === "IN_PROGRESS") { - this.#logger.log("Checkpoint already in progress, won't retry", { runId }); - this.#failCheckpoint(runId, result.reason); - return result; - } - - if (result.reason === "NO_SUPPORT") { - this.#logger.log("No checkpoint support, won't retry", { runId }); - this.#failCheckpoint(runId, result.reason); - return result; - } - - if (result.reason === "DISABLED") { - this.#logger.log("Checkpoint support disabled, won't retry", { runId }); - this.#failCheckpoint(runId, result.reason); - return result; - } - if (result.reason === "SKIP_RETRYING") { this.#logger.log("Skipping retrying", { runId }); return result; @@ -384,13 +417,16 @@ export class Checkpointer { return { success: false, reason: "ERROR" }; } - async #checkpointAndPush({ - runId, - leaveRunning = true, // This mirrors kubernetes behaviour more accurately - projectRef, - deploymentVersion, - attemptNumber, - }: CheckpointAndPushOptions): Promise { + async #checkpointAndPush( + { + runId, + leaveRunning = true, // This mirrors kubernetes behaviour more accurately + projectRef, + deploymentVersion, + attemptNumber, + }: CheckpointAndPushOptions, + { signal }: { signal: AbortSignal } + ): Promise { await this.init(); const options = { @@ -401,47 +437,12 @@ export class Checkpointer { attemptNumber, }; - if (!this.#dockerMode && !this.#canCheckpoint) { - this.#logger.error("No checkpoint support. Simulation requires docker."); - return { success: false, reason: "NO_SUPPORT" }; - } - - if (this.#isCheckpointing(runId)) { - this.#logger.error("Checkpoint procedure already in progress", { options }); - return { success: false, reason: "IN_PROGRESS" }; - } - - // This is a new checkpoint, clear any last failure for this run - this.#clearFailedCheckpoint(runId); - - if (this.disableCheckpointSupport) { - this.#logger.error("Checkpoint support disabled", { options }); - return { success: false, reason: "DISABLED" }; - } - - const controller = new AbortController(); - this.#abortControllers.set(runId, controller); - - const onAbort = () => { - this.#logger.error("Checkpoint aborted", { options }); - controller.signal.removeEventListener("abort", onAbort); - }; - controller.signal.addEventListener("abort", onAbort); - const shortCode = nanoid(8); const imageRef = this.#getImageRef(projectRef, deploymentVersion, shortCode); const exportLocation = this.#getExportLocation(projectRef, deploymentVersion, shortCode); - const buildah = new Buildah({ id: `${runId}-${shortCode}`, abortSignal: controller.signal }); - const crictl = new Crictl({ id: `${runId}-${shortCode}`, abortSignal: controller.signal }); - - const removeCurrentAbortController = () => { - // Ensure only the current controller is removed - if (this.#abortControllers.get(runId) === controller) { - this.#abortControllers.delete(runId); - } - controller.signal.removeEventListener("abort", onAbort); - }; + const buildah = new Buildah({ id: `${runId}-${shortCode}`, abortSignal: signal }); + const crictl = new Crictl({ id: `${runId}-${shortCode}`, abortSignal: signal }); const cleanup = async () => { const metadata = { @@ -452,7 +453,6 @@ export class Checkpointer { if (this.#dockerMode) { this.#logger.debug("Skipping cleanup in docker mode", metadata); - removeCurrentAbortController(); return; } @@ -464,28 +464,26 @@ export class Checkpointer { } catch (error) { this.#logger.error("Error during cleanup", { ...metadata, error }); } - - removeCurrentAbortController(); }; try { await this.chaosMonkey.call(); - this.#logger.log("Checkpointing:", { options }); + this.#logger.log("checkpointAndPush: checkpointing", { options }); const containterName = this.#getRunContainerName(runId); // Create checkpoint (docker) if (this.#dockerMode) { await this.#createDockerCheckpoint( - controller.signal, + signal, runId, exportLocation, leaveRunning, attemptNumber ); - this.#logger.log("checkpoint created:", { + this.#logger.log("checkpointAndPush: checkpoint created", { runId, location: exportLocation, }); @@ -586,13 +584,16 @@ export class Checkpointer { } } - this.#logger.error("Unhandled checkpoint error", { options, error }); + this.#logger.error("Unhandled checkpoint error", { + options, + error: error instanceof Error ? error.message : error, + }); return { success: false, reason: "ERROR" }; } finally { await cleanup(); - if (controller.signal.aborted) { + if (signal.aborted) { this.#logger.error("Checkpoint canceled: Cleanup", { options }); // Overrides any prior return value diff --git a/apps/coordinator/src/index.ts b/apps/coordinator/src/index.ts index 65f9520826..01c66417dc 100644 --- a/apps/coordinator/src/index.ts +++ b/apps/coordinator/src/index.ts @@ -175,7 +175,10 @@ class TaskCoordinator { await chaosMonkey.call(); // In case the task resumes before the checkpoint is created - this.#cancelCheckpoint(message.runId); + this.#cancelCheckpoint(message.runId, { + event: "RESUME_AFTER_DEPENDENCY", + completions: message.completions.length, + }); taskSocket.emit("RESUME_AFTER_DEPENDENCY", message); }, @@ -226,7 +229,10 @@ class TaskCoordinator { await chaosMonkey.call(); // In case the task resumes before the checkpoint is created - this.#cancelCheckpoint(message.runId); + this.#cancelCheckpoint(message.runId, { + event: "RESUME_AFTER_DEPENDENCY_WITH_ACK", + completions: message.completions.length, + }); taskSocket.emit("RESUME_AFTER_DEPENDENCY", message); @@ -294,7 +300,7 @@ class TaskCoordinator { log.addFields({ socketId: taskSocket.id, socketData: taskSocket.data }); log.log("Found task socket for REQUEST_RUN_CANCELLATION"); - this.#cancelCheckpoint(message.runId); + this.#cancelCheckpoint(message.runId, { event: "REQUEST_RUN_CANCELLATION", ...message }); if (message.delayInMs) { taskSocket.emit("REQUEST_EXIT", { @@ -728,7 +734,10 @@ class TaskCoordinator { const { completion, execution } = message; // Cancel all in-progress checkpoints (if any) - this.#cancelCheckpoint(socket.data.runId); + this.#cancelCheckpoint(socket.data.runId, { + event: "TASK_RUN_COMPLETED", + attemptNumber: execution.attempt.number, + }); await chaosMonkey.call({ throwErrors: false }); @@ -913,7 +922,10 @@ class TaskCoordinator { try { // Cancel all in-progress checkpoints (if any) - this.#cancelCheckpoint(socket.data.runId); + this.#cancelCheckpoint(socket.data.runId, { + event: "TASK_RUN_FAILED_TO_RUN", + errorType: completion.error.type, + }); this.#platformSocket?.send("TASK_RUN_FAILED_TO_RUN", { version: "v1", @@ -966,12 +978,15 @@ class TaskCoordinator { try { if (message.version === "v1") { - this.#cancelCheckpoint(socket.data.runId); + this.#cancelCheckpoint(socket.data.runId, { event: "CANCEL_CHECKPOINT", ...message }); // v1 has no callback return; } - const checkpointCanceled = this.#cancelCheckpoint(socket.data.runId); + const checkpointCanceled = this.#cancelCheckpoint(socket.data.runId, { + event: "CANCEL_CHECKPOINT", + ...message, + }); callback({ version: "v2", checkpointCanceled }); } catch (error) { @@ -1455,7 +1470,9 @@ class TaskCoordinator { }); } - #cancelCheckpoint(runId: string): boolean { + #cancelCheckpoint(runId: string, reason?: any): boolean { + logger.log("cancelCheckpoint: call", { runId, reason }); + const checkpointWait = this.#checkpointableTasks.get(runId); if (checkpointWait) { @@ -1464,9 +1481,14 @@ class TaskCoordinator { } // Cancel checkpointing procedure - const checkpointCanceled = this.#checkpointer.cancelCheckpoint(runId); + const checkpointCanceled = this.#checkpointer.cancelAllCheckpointsForRun(runId); - logger.log("cancelCheckpoint()", { runId, checkpointCanceled }); + logger.log("cancelCheckpoint: result", { + runId, + reason, + checkpointCanceled, + hadCheckpointWait: !!checkpointWait, + }); return checkpointCanceled; }