diff --git a/apps/coordinator/src/checkpointer.ts b/apps/coordinator/src/checkpointer.ts index c52060e9c6..269bf6d421 100644 --- a/apps/coordinator/src/checkpointer.ts +++ b/apps/coordinator/src/checkpointer.ts @@ -27,7 +27,7 @@ type CheckpointAndPushResult = | { success: true; checkpoint: CheckpointData } | { success: false; - reason?: "CANCELED" | "DISABLED" | "ERROR" | "IN_PROGRESS" | "NO_SUPPORT" | "SKIP_RETRYING"; + reason?: "CANCELED" | "ERROR" | "SKIP_RETRYING"; }; type CheckpointData = { @@ -87,9 +87,14 @@ export class Checkpointer { #dockerMode: boolean; #logger = new SimpleStructuredLogger("checkpointer"); - #abortControllers = new Map(); + #failedCheckpoints = new Map(); - #waitingForRetry = new Set(); + + // Indexed by run ID + #runAbortControllers = new Map< + string, + { signal: AbortSignal; abort: AbortController["abort"] } + >(); private registryHost: string; private registryNamespace: string; @@ -189,29 +194,80 @@ export class Checkpointer { } } - async checkpointAndPush(opts: CheckpointAndPushOptions): Promise { + async checkpointAndPush( + opts: CheckpointAndPushOptions, + delayMs?: number + ): Promise { const start = performance.now(); this.#logger.log(`checkpointAndPush() start`, { start, opts }); - let interval: NodeJS.Timer | undefined; + const { runId } = opts; + let interval: NodeJS.Timer | undefined; if (opts.shouldHeartbeat) { interval = setInterval(() => { - this.#logger.log("Sending heartbeat", { runId: opts.runId }); - this.opts.heartbeat(opts.runId); + this.#logger.log("Sending heartbeat", { runId }); + this.opts.heartbeat(runId); }, 20_000); } + const controller = new AbortController(); + const signal = controller.signal; + const abort = controller.abort.bind(controller); + + const onAbort = () => { + this.#logger.error("Checkpoint aborted", { runId, options: opts }); + }; + + signal.addEventListener("abort", onAbort, { once: true }); + + const removeCurrentAbortController = () => { + const controller = this.#runAbortControllers.get(runId); + + // Ensure only the current controller is removed + if (controller && controller.signal === signal) { + this.#runAbortControllers.delete(runId); + } + + // Remove the abort listener in case it hasn't fired + signal.removeEventListener("abort", onAbort); + }; + + if (!this.#dockerMode && !this.#canCheckpoint) { + this.#logger.error("No checkpoint support. Simulation requires docker."); + this.#failCheckpoint(runId, "NO_SUPPORT"); + return; + } + + if (this.#isRunCheckpointing(runId)) { + this.#logger.error("Checkpoint procedure already in progress", { options: opts }); + this.#failCheckpoint(runId, "IN_PROGRESS"); + return; + } + + // This is a new checkpoint, clear any last failure for this run + this.#clearFailedCheckpoint(runId); + + if (this.disableCheckpointSupport) { + this.#logger.error("Checkpoint support disabled", { options: opts }); + this.#failCheckpoint(runId, "DISABLED"); + return; + } + + this.#runAbortControllers.set(runId, { signal, abort }); + try { - const result = await this.#checkpointAndPushWithBackoff(opts); + const result = await this.#checkpointAndPushWithBackoff(opts, { delayMs, signal }); const end = performance.now(); this.#logger.log(`checkpointAndPush() end`, { start, end, diff: end - start, + diffWithoutDelay: end - start - (delayMs ?? 0), opts, success: result.success, + delayMs, }); if (!result.success) { @@ -223,51 +279,66 @@ export class Checkpointer { if (opts.shouldHeartbeat) { clearInterval(interval); } + removeCurrentAbortController(); } } - isCheckpointing(runId: string) { - return this.#abortControllers.has(runId) || this.#waitingForRetry.has(runId); + #isRunCheckpointing(runId: string) { + return this.#runAbortControllers.has(runId); } - cancelCheckpoint(runId: string): boolean { + cancelAllCheckpointsForRun(runId: string): boolean { + this.#logger.log("cancelAllCheckpointsForRun: call", { runId }); + // If the last checkpoint failed, pretend we canceled it // This ensures tasks don't wait for external resume messages to continue if (this.#hasFailedCheckpoint(runId)) { + this.#logger.log("cancelAllCheckpointsForRun: hasFailedCheckpoint", { runId }); this.#clearFailedCheckpoint(runId); return true; } - if (this.#waitingForRetry.has(runId)) { - this.#waitingForRetry.delete(runId); - return true; - } - - const controller = this.#abortControllers.get(runId); + const controller = this.#runAbortControllers.get(runId); if (!controller) { - this.#logger.debug("Nothing to cancel", { runId }); + this.#logger.debug("cancelAllCheckpointsForRun: no abort controller", { runId }); return false; } - if (controller.signal.aborted) { - this.#logger.debug("Controller already aborted", { runId }); + const { abort, signal } = controller; + + if (signal.aborted) { + this.#logger.debug("cancelAllCheckpointsForRun: signal already aborted", { runId }); return false; } - controller.abort("cancelCheckpoint()"); - this.#abortControllers.delete(runId); + abort("cancelCheckpoint()"); + this.#runAbortControllers.delete(runId); return true; } - async #checkpointAndPushWithBackoff({ - runId, - leaveRunning = true, // This mirrors kubernetes behaviour more accurately - projectRef, - deploymentVersion, - attemptNumber, - }: CheckpointAndPushOptions): Promise { + async #checkpointAndPushWithBackoff( + { + runId, + leaveRunning = true, // This mirrors kubernetes behaviour more accurately + projectRef, + deploymentVersion, + attemptNumber, + }: CheckpointAndPushOptions, + { delayMs, signal }: { delayMs?: number; signal: AbortSignal } + ): Promise { + if (delayMs && delayMs > 0) { + this.#logger.log("Delaying checkpoint", { runId, delayMs }); + + try { + await setTimeout(delayMs, undefined, { signal }); + } catch (error) { + this.#logger.log("Checkpoint canceled during initial delay", { runId }); + return { success: false, reason: "CANCELED" }; + } + } + this.#logger.log("Checkpointing with backoff", { runId, leaveRunning, @@ -290,24 +361,24 @@ export class Checkpointer { delay, }); - this.#waitingForRetry.add(runId); - await setTimeout(delay.milliseconds); - - if (!this.#waitingForRetry.has(runId)) { - this.#logger.log("Checkpoint canceled while waiting for retry", { runId }); + try { + await setTimeout(delay.milliseconds, undefined, { signal }); + } catch (error) { + this.#logger.log("Checkpoint canceled during retry delay", { runId }); return { success: false, reason: "CANCELED" }; - } else { - this.#waitingForRetry.delete(runId); } } - const result = await this.#checkpointAndPush({ - runId, - leaveRunning, - projectRef, - deploymentVersion, - attemptNumber, - }); + const result = await this.#checkpointAndPush( + { + runId, + leaveRunning, + projectRef, + deploymentVersion, + attemptNumber, + }, + { signal } + ); if (result.success) { return result; @@ -319,24 +390,6 @@ export class Checkpointer { return result; } - if (result.reason === "IN_PROGRESS") { - this.#logger.log("Checkpoint already in progress, won't retry", { runId }); - this.#failCheckpoint(runId, result.reason); - return result; - } - - if (result.reason === "NO_SUPPORT") { - this.#logger.log("No checkpoint support, won't retry", { runId }); - this.#failCheckpoint(runId, result.reason); - return result; - } - - if (result.reason === "DISABLED") { - this.#logger.log("Checkpoint support disabled, won't retry", { runId }); - this.#failCheckpoint(runId, result.reason); - return result; - } - if (result.reason === "SKIP_RETRYING") { this.#logger.log("Skipping retrying", { runId }); return result; @@ -364,13 +417,16 @@ export class Checkpointer { return { success: false, reason: "ERROR" }; } - async #checkpointAndPush({ - runId, - leaveRunning = true, // This mirrors kubernetes behaviour more accurately - projectRef, - deploymentVersion, - attemptNumber, - }: CheckpointAndPushOptions): Promise { + async #checkpointAndPush( + { + runId, + leaveRunning = true, // This mirrors kubernetes behaviour more accurately + projectRef, + deploymentVersion, + attemptNumber, + }: CheckpointAndPushOptions, + { signal }: { signal: AbortSignal } + ): Promise { await this.init(); const options = { @@ -381,47 +437,12 @@ export class Checkpointer { attemptNumber, }; - if (!this.#dockerMode && !this.#canCheckpoint) { - this.#logger.error("No checkpoint support. Simulation requires docker."); - return { success: false, reason: "NO_SUPPORT" }; - } - - if (this.isCheckpointing(runId)) { - this.#logger.error("Checkpoint procedure already in progress", { options }); - return { success: false, reason: "IN_PROGRESS" }; - } - - // This is a new checkpoint, clear any last failure for this run - this.#clearFailedCheckpoint(runId); - - if (this.disableCheckpointSupport) { - this.#logger.error("Checkpoint support disabled", { options }); - return { success: false, reason: "DISABLED" }; - } - - const controller = new AbortController(); - this.#abortControllers.set(runId, controller); - - const onAbort = () => { - this.#logger.error("Checkpoint aborted", { options }); - controller.signal.removeEventListener("abort", onAbort); - }; - controller.signal.addEventListener("abort", onAbort); - const shortCode = nanoid(8); const imageRef = this.#getImageRef(projectRef, deploymentVersion, shortCode); const exportLocation = this.#getExportLocation(projectRef, deploymentVersion, shortCode); - const buildah = new Buildah({ id: `${runId}-${shortCode}`, abortSignal: controller.signal }); - const crictl = new Crictl({ id: `${runId}-${shortCode}`, abortSignal: controller.signal }); - - const removeCurrentAbortController = () => { - // Ensure only the current controller is removed - if (this.#abortControllers.get(runId) === controller) { - this.#abortControllers.delete(runId); - } - controller.signal.removeEventListener("abort", onAbort); - }; + const buildah = new Buildah({ id: `${runId}-${shortCode}`, abortSignal: signal }); + const crictl = new Crictl({ id: `${runId}-${shortCode}`, abortSignal: signal }); const cleanup = async () => { const metadata = { @@ -432,7 +453,6 @@ export class Checkpointer { if (this.#dockerMode) { this.#logger.debug("Skipping cleanup in docker mode", metadata); - removeCurrentAbortController(); return; } @@ -444,28 +464,26 @@ export class Checkpointer { } catch (error) { this.#logger.error("Error during cleanup", { ...metadata, error }); } - - removeCurrentAbortController(); }; try { await this.chaosMonkey.call(); - this.#logger.log("Checkpointing:", { options }); + this.#logger.log("checkpointAndPush: checkpointing", { options }); const containterName = this.#getRunContainerName(runId); // Create checkpoint (docker) if (this.#dockerMode) { await this.#createDockerCheckpoint( - controller.signal, + signal, runId, exportLocation, leaveRunning, attemptNumber ); - this.#logger.log("checkpoint created:", { + this.#logger.log("checkpointAndPush: checkpoint created", { runId, location: exportLocation, }); @@ -566,13 +584,16 @@ export class Checkpointer { } } - this.#logger.error("Unhandled checkpoint error", { options, error }); + this.#logger.error("Unhandled checkpoint error", { + options, + error: error instanceof Error ? error.message : error, + }); return { success: false, reason: "ERROR" }; } finally { await cleanup(); - if (controller.signal.aborted) { + if (signal.aborted) { this.#logger.error("Checkpoint canceled: Cleanup", { options }); // Overrides any prior return value @@ -581,6 +602,16 @@ export class Checkpointer { } } + async unpause(runId: string, attemptNumber?: number): Promise { + try { + const containterNameWithAttempt = this.#getRunContainerName(runId, attemptNumber); + const exec = new Exec({ logger: this.#logger }); + await exec.x("docker", ["unpause", containterNameWithAttempt]); + } catch (error) { + this.#logger.error("[Docker] Error during unpause", { runId, attemptNumber, error }); + } + } + async #createDockerCheckpoint( abortSignal: AbortSignal, runId: string, diff --git a/apps/coordinator/src/index.ts b/apps/coordinator/src/index.ts index aa5a6062b7..01c66417dc 100644 --- a/apps/coordinator/src/index.ts +++ b/apps/coordinator/src/index.ts @@ -35,6 +35,11 @@ const TASK_RUN_COMPLETED_WITH_ACK_TIMEOUT_MS = const TASK_RUN_COMPLETED_WITH_ACK_MAX_RETRIES = parseInt(process.env.TASK_RUN_COMPLETED_WITH_ACK_MAX_RETRIES || "") || 7; +const WAIT_FOR_TASK_CHECKPOINT_DELAY_MS = + parseInt(process.env.WAIT_FOR_TASK_CHECKPOINT_DELAY_MS || "") || 0; +const WAIT_FOR_BATCH_CHECKPOINT_DELAY_MS = + parseInt(process.env.WAIT_FOR_BATCH_CHECKPOINT_DELAY_MS || "") || 0; + const logger = new SimpleStructuredLogger("coordinator", undefined, { nodeName: NODE_NAME }); const chaosMonkey = new ChaosMonkey( !!process.env.CHAOS_MONKEY_ENABLED, @@ -143,6 +148,7 @@ class TaskCoordinator { authToken: PLATFORM_SECRET, logHandlerPayloads: false, handlers: { + // This is used by resumeAttempt RESUME_AFTER_DEPENDENCY: async (message) => { const log = platformLogger.child({ eventName: "RESUME_AFTER_DEPENDENCY", @@ -168,11 +174,15 @@ class TaskCoordinator { await chaosMonkey.call(); - // In case the task resumed faster than we could checkpoint - this.#cancelCheckpoint(message.runId); + // In case the task resumes before the checkpoint is created + this.#cancelCheckpoint(message.runId, { + event: "RESUME_AFTER_DEPENDENCY", + completions: message.completions.length, + }); taskSocket.emit("RESUME_AFTER_DEPENDENCY", message); }, + // This is used by sharedQueueConsumer RESUME_AFTER_DEPENDENCY_WITH_ACK: async (message) => { const log = platformLogger.child({ eventName: "RESUME_AFTER_DEPENDENCY_WITH_ACK", @@ -218,8 +228,11 @@ class TaskCoordinator { await chaosMonkey.call(); - // In case the task resumed faster than we could checkpoint - this.#cancelCheckpoint(message.runId); + // In case the task resumes before the checkpoint is created + this.#cancelCheckpoint(message.runId, { + event: "RESUME_AFTER_DEPENDENCY_WITH_ACK", + completions: message.completions.length, + }); taskSocket.emit("RESUME_AFTER_DEPENDENCY", message); @@ -287,7 +300,7 @@ class TaskCoordinator { log.addFields({ socketId: taskSocket.id, socketData: taskSocket.data }); log.log("Found task socket for REQUEST_RUN_CANCELLATION"); - this.#cancelCheckpoint(message.runId); + this.#cancelCheckpoint(message.runId, { event: "REQUEST_RUN_CANCELLATION", ...message }); if (message.delayInMs) { taskSocket.emit("REQUEST_EXIT", { @@ -721,7 +734,10 @@ class TaskCoordinator { const { completion, execution } = message; // Cancel all in-progress checkpoints (if any) - this.#cancelCheckpoint(socket.data.runId); + this.#cancelCheckpoint(socket.data.runId, { + event: "TASK_RUN_COMPLETED", + attemptNumber: execution.attempt.number, + }); await chaosMonkey.call({ throwErrors: false }); @@ -906,7 +922,10 @@ class TaskCoordinator { try { // Cancel all in-progress checkpoints (if any) - this.#cancelCheckpoint(socket.data.runId); + this.#cancelCheckpoint(socket.data.runId, { + event: "TASK_RUN_FAILED_TO_RUN", + errorType: completion.error.type, + }); this.#platformSocket?.send("TASK_RUN_FAILED_TO_RUN", { version: "v1", @@ -959,12 +978,15 @@ class TaskCoordinator { try { if (message.version === "v1") { - this.#cancelCheckpoint(socket.data.runId); + this.#cancelCheckpoint(socket.data.runId, { event: "CANCEL_CHECKPOINT", ...message }); // v1 has no callback return; } - const checkpointCanceled = this.#cancelCheckpoint(socket.data.runId); + const checkpointCanceled = this.#cancelCheckpoint(socket.data.runId, { + event: "CANCEL_CHECKPOINT", + ...message, + }); callback({ version: "v2", checkpointCanceled }); } catch (error) { @@ -1008,11 +1030,14 @@ class TaskCoordinator { return; } + const runId = socket.data.runId; + const attemptNumber = getAttemptNumber(); + const checkpoint = await this.#checkpointer.checkpointAndPush({ - runId: socket.data.runId, + runId, projectRef: socket.data.projectRef, deploymentVersion: socket.data.deploymentVersion, - attemptNumber: getAttemptNumber(), + attemptNumber, }); if (!checkpoint) { @@ -1038,6 +1063,13 @@ class TaskCoordinator { if (ack?.keepRunAlive) { log.log("keeping run alive after duration checkpoint"); + + if (checkpoint.docker && willSimulate) { + // The container is still paused so we need to unpause it + log.log("unpausing container after duration checkpoint"); + this.#checkpointer.unpause(runId, attemptNumber); + } + return; } @@ -1096,12 +1128,18 @@ class TaskCoordinator { } } - const checkpoint = await this.#checkpointer.checkpointAndPush({ - runId: socket.data.runId, - projectRef: socket.data.projectRef, - deploymentVersion: socket.data.deploymentVersion, - attemptNumber: getAttemptNumber(), - }); + const runId = socket.data.runId; + const attemptNumber = getAttemptNumber(); + + const checkpoint = await this.#checkpointer.checkpointAndPush( + { + runId, + projectRef: socket.data.projectRef, + deploymentVersion: socket.data.deploymentVersion, + attemptNumber, + }, + WAIT_FOR_TASK_CHECKPOINT_DELAY_MS + ); if (!checkpoint) { log.error("Failed to checkpoint"); @@ -1131,6 +1169,13 @@ class TaskCoordinator { if (ack?.keepRunAlive) { socket.data.requiresCheckpointResumeWithMessage = undefined; log.log("keeping run alive after task checkpoint"); + + if (checkpoint.docker && willSimulate) { + // The container is still paused so we need to unpause it + log.log("unpausing container after duration checkpoint"); + this.#checkpointer.unpause(runId, attemptNumber); + } + return; } @@ -1189,12 +1234,18 @@ class TaskCoordinator { } } - const checkpoint = await this.#checkpointer.checkpointAndPush({ - runId: socket.data.runId, - projectRef: socket.data.projectRef, - deploymentVersion: socket.data.deploymentVersion, - attemptNumber: getAttemptNumber(), - }); + const runId = socket.data.runId; + const attemptNumber = getAttemptNumber(); + + const checkpoint = await this.#checkpointer.checkpointAndPush( + { + runId, + projectRef: socket.data.projectRef, + deploymentVersion: socket.data.deploymentVersion, + attemptNumber, + }, + WAIT_FOR_BATCH_CHECKPOINT_DELAY_MS + ); if (!checkpoint) { log.error("Failed to checkpoint"); @@ -1225,6 +1276,13 @@ class TaskCoordinator { if (ack?.keepRunAlive) { socket.data.requiresCheckpointResumeWithMessage = undefined; log.log("keeping run alive after batch checkpoint"); + + if (checkpoint.docker && willSimulate) { + // The container is still paused so we need to unpause it + log.log("unpausing container after batch checkpoint"); + this.#checkpointer.unpause(runId, attemptNumber); + } + return; } @@ -1412,7 +1470,9 @@ class TaskCoordinator { }); } - #cancelCheckpoint(runId: string): boolean { + #cancelCheckpoint(runId: string, reason?: any): boolean { + logger.log("cancelCheckpoint: call", { runId, reason }); + const checkpointWait = this.#checkpointableTasks.get(runId); if (checkpointWait) { @@ -1421,9 +1481,14 @@ class TaskCoordinator { } // Cancel checkpointing procedure - const checkpointCanceled = this.#checkpointer.cancelCheckpoint(runId); + const checkpointCanceled = this.#checkpointer.cancelAllCheckpointsForRun(runId); - logger.log("cancelCheckpoint()", { runId, checkpointCanceled }); + logger.log("cancelCheckpoint: result", { + runId, + reason, + checkpointCanceled, + hadCheckpointWait: !!checkpointWait, + }); return checkpointCanceled; } diff --git a/apps/webapp/app/v3/services/createCheckpoint.server.ts b/apps/webapp/app/v3/services/createCheckpoint.server.ts index 84d695e80f..5b0367c22b 100644 --- a/apps/webapp/app/v3/services/createCheckpoint.server.ts +++ b/apps/webapp/app/v3/services/createCheckpoint.server.ts @@ -93,6 +93,99 @@ export class CreateCheckpointService extends BaseService { }; } + const { reason } = params; + + // Check if we should accept this checkpoint + switch (reason.type) { + case "MANUAL": { + // Always accept manual checkpoints + break; + } + case "WAIT_FOR_DURATION": { + // Always accept duration checkpoints + break; + } + case "WAIT_FOR_TASK": { + const childRun = await this._prisma.taskRun.findFirst({ + where: { + friendlyId: reason.friendlyId, + }, + select: { + dependency: { + select: { + resumedAt: true, + }, + }, + }, + }); + + if (!childRun) { + logger.error("CreateCheckpointService: Pre-check - WAIT_FOR_TASK child run not found", { + friendlyId: reason.friendlyId, + params, + }); + + return { + success: false, + keepRunAlive: false, + }; + } + + if (childRun.dependency?.resumedAt) { + logger.error("CreateCheckpointService: Child run already resumed", { + childRun, + params, + }); + + return { + success: false, + keepRunAlive: true, + }; + } + + break; + } + case "WAIT_FOR_BATCH": { + const batchRun = await this._prisma.batchTaskRun.findFirst({ + where: { + friendlyId: reason.batchFriendlyId, + }, + select: { + resumedAt: true, + }, + }); + + if (!batchRun) { + logger.error("CreateCheckpointService: Pre-check - Batch not found", { + batchFriendlyId: reason.batchFriendlyId, + params, + }); + + return { + success: false, + keepRunAlive: false, + }; + } + + if (batchRun.resumedAt) { + logger.error("CreateCheckpointService: Batch already resumed", { + batchRun, + params, + }); + + return { + success: false, + keepRunAlive: true, + }; + } + + break; + } + default: { + break; + } + } + //sleep to test slow checkpoints // Sleep a random value between 4 and 30 seconds // await new Promise((resolve) => { @@ -146,8 +239,6 @@ export class CreateCheckpointService extends BaseService { }, }); - const { reason } = params; - let checkpointEvent: CheckpointRestoreEvent | undefined; switch (reason.type) { diff --git a/apps/webapp/app/v3/services/resumeTaskDependency.server.ts b/apps/webapp/app/v3/services/resumeTaskDependency.server.ts index c941cd693c..de99b526c4 100644 --- a/apps/webapp/app/v3/services/resumeTaskDependency.server.ts +++ b/apps/webapp/app/v3/services/resumeTaskDependency.server.ts @@ -3,6 +3,7 @@ import { workerQueue } from "~/services/worker.server"; import { MarQS, marqs, MarQSPriorityLevel } from "~/v3/marqs/index.server"; import { BaseService } from "./baseService.server"; import { logger } from "~/services/logger.server"; +import { TaskRunDependency } from "@trigger.dev/database"; export class ResumeTaskDependencyService extends BaseService { public async call(dependencyId: string, sourceTaskAttemptId: string) { @@ -50,6 +51,19 @@ export class ResumeTaskDependencyService extends BaseService { } ); + const wasUpdated = await this.#setDependencyToResumedOnce(dependency); + + if (!wasUpdated) { + logger.debug("Task dependency resume: Attempt with checkpoint was already resumed", { + attemptId: dependency.id, + dependentAttempt: dependency.dependentAttempt, + checkpointEventId: dependency.checkpointEventId, + hasCheckpointEvent: !!dependency.checkpointEventId, + runId: dependentRun.id, + }); + return; + } + // TODO: use the new priority queue thingie await marqs?.enqueueMessage( dependency.taskRun.runtimeEnvironment, @@ -89,6 +103,19 @@ export class ResumeTaskDependencyService extends BaseService { return; } + const wasUpdated = await this.#setDependencyToResumedOnce(dependency); + + if (!wasUpdated) { + logger.debug("Task dependency resume: Attempt without checkpoint was already resumed", { + attemptId: dependency.id, + dependentAttempt: dependency.dependentAttempt, + checkpointEventId: dependency.checkpointEventId, + hasCheckpointEvent: !!dependency.checkpointEventId, + runId: dependentRun.id, + }); + return; + } + await marqs?.requeueMessage( dependentRun.id, { @@ -107,6 +134,26 @@ export class ResumeTaskDependencyService extends BaseService { } } + async #setDependencyToResumedOnce(dependency: TaskRunDependency) { + const result = await this._prisma.taskRunDependency.updateMany({ + where: { + id: dependency.id, + resumedAt: null, + }, + data: { + resumedAt: new Date(), + }, + }); + + // Check if any records were updated + if (result.count > 0) { + // The status was changed, so we return true + return true; + } else { + return false; + } + } + static async enqueue( dependencyId: string, sourceTaskAttemptId: string, diff --git a/internal-packages/database/prisma/migrations/20250218124428_add_resumed_at_to_task_run_dependency/migration.sql b/internal-packages/database/prisma/migrations/20250218124428_add_resumed_at_to_task_run_dependency/migration.sql new file mode 100644 index 0000000000..032ff92fd4 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20250218124428_add_resumed_at_to_task_run_dependency/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "TaskRunDependency" ADD COLUMN "resumedAt" TIMESTAMP(3); diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 7ff86d95e1..7617be6dd5 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -1917,6 +1917,7 @@ model TaskRunDependency { createdAt DateTime @default(now()) updatedAt DateTime @updatedAt + resumedAt DateTime? @@index([dependentAttemptId]) @@index([dependentBatchRunId])