diff --git a/.changeset/sixty-donuts-fail.md b/.changeset/sixty-donuts-fail.md new file mode 100644 index 0000000000..c8828e3d05 --- /dev/null +++ b/.changeset/sixty-donuts-fail.md @@ -0,0 +1,5 @@ +--- +"trigger.dev": patch +--- + +Improve resume reliability by replaying ready signal of restored workers diff --git a/packages/cli-v3/src/entryPoints/deploy-run-controller.ts b/packages/cli-v3/src/entryPoints/deploy-run-controller.ts index 6d449a8b69..c06ff26ecd 100644 --- a/packages/cli-v3/src/entryPoints/deploy-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/deploy-run-controller.ts @@ -97,6 +97,12 @@ class ProdWorker { idempotencyKey: string; } | undefined; + private readyForResumeReplay: + | { + idempotencyKey: string; + type: WaitReason; + } + | undefined; #httpPort: number; #httpServer: ReturnType; @@ -365,10 +371,18 @@ class ProdWorker { async #prepareForRetry() { // Clear state for retrying this.paused = false; + this.nextResumeAfter = undefined; this.waitForPostStart = false; this.executing = false; this.attemptFriendlyId = undefined; this.attemptNumber = undefined; + + // Clear replay state + this.waitForTaskReplay = undefined; + this.waitForBatchReplay = undefined; + this.readyForLazyAttemptReplay = undefined; + this.durationResumeFallback = undefined; + this.readyForResumeReplay = undefined; } // MARK: CHECKPOINT PREP @@ -405,6 +419,7 @@ class ProdWorker { this.waitForPostStart = false; this.durationResumeFallback = undefined; + this.readyForResumeReplay = undefined; this._taskRunProcess?.waitCompletedNotification(); } @@ -412,6 +427,8 @@ class ProdWorker { async #readyForLazyAttempt() { const idempotencyKey = randomUUID(); + logger.log("ready for lazy attempt", { idempotencyKey }); + this.readyForLazyAttemptReplay = { idempotencyKey, }; @@ -420,7 +437,7 @@ class ProdWorker { // ..but we also have to be fast to avoid failing the task due to missing heartbeat for await (const { delay, retry } of defaultBackoff.min(10).maxRetries(7)) { if (retry > 0) { - logger.log("retrying ready for lazy attempt", { retry }); + logger.log("retrying ready for lazy attempt", { retry, idempotencyKey }); } this.#coordinatorSocket.socket.emit("READY_FOR_LAZY_ATTEMPT", { @@ -453,6 +470,93 @@ class ProdWorker { this.#failRun(this.runId, "Failed to receive execute request in a reasonable time"); } + async #readyForResume() { + const idempotencyKey = randomUUID(); + + logger.log("readyForResume()", { + nextResumeAfter: this.nextResumeAfter, + attemptFriendlyId: this.attemptFriendlyId, + attemptNumber: this.attemptNumber, + idempotencyKey, + }); + + if (!this.nextResumeAfter) { + logger.error("Missing next resume reason", { status: this.#status }); + + this.#emitUnrecoverableError( + "NoNextResume", + "Next resume reason not set while resuming from paused state" + ); + + return; + } + + if (!this.attemptFriendlyId) { + logger.error("Missing attempt friendly ID", { status: this.#status }); + + this.#emitUnrecoverableError( + "NoAttemptId", + "Attempt ID not set while resuming from paused state" + ); + + return; + } + + if (!this.attemptNumber) { + logger.error("Missing attempt number", { status: this.#status }); + + this.#emitUnrecoverableError( + "NoAttemptNumber", + "Attempt number not set while resuming from paused state" + ); + + return; + } + + this.readyForResumeReplay = { + idempotencyKey, + type: this.nextResumeAfter, + }; + + const lockedMetadata = { + attemptFriendlyId: this.attemptFriendlyId, + attemptNumber: this.attemptNumber, + type: this.nextResumeAfter, + }; + + // Retry if we don't receive RESUME_AFTER_DEPENDENCY or RESUME_AFTER_DURATION in a reasonable time + // ..but we also have to be fast to avoid failing the task due to missing heartbeat + for await (const { delay, retry } of defaultBackoff.min(10).maxRetries(7)) { + if (retry > 0) { + logger.log("retrying ready for resume", { retry, idempotencyKey }); + } + + this.#coordinatorSocket.socket.emit("READY_FOR_RESUME", { + version: "v2", + ...lockedMetadata, + }); + + await timeout(delay.milliseconds); + + if (!this.readyForResumeReplay) { + logger.log("replay ready for resume cancelled, discarding", { + idempotencyKey, + }); + + return; + } + + if (idempotencyKey !== this.readyForResumeReplay.idempotencyKey) { + logger.log("replay ready for resume idempotency key mismatch, discarding", { + idempotencyKey, + newIdempotencyKey: this.readyForResumeReplay.idempotencyKey, + }); + + return; + } + } + } + #readyForCheckpoint() { this.#coordinatorSocket.socket.emit("READY_FOR_CHECKPOINT", { version: "v1" }); } @@ -630,6 +734,7 @@ class ProdWorker { this.paused = false; this.nextResumeAfter = undefined; this.waitForPostStart = false; + this.readyForResumeReplay = undefined; for (let i = 0; i < completions.length; i++) { const completion = completions[i]; @@ -845,46 +950,7 @@ class ProdWorker { } if (this.paused) { - if (!this.nextResumeAfter) { - logger.error("Missing next resume reason", { status: this.#status }); - - this.#emitUnrecoverableError( - "NoNextResume", - "Next resume reason not set while resuming from paused state" - ); - - return; - } - - if (!this.attemptFriendlyId) { - logger.error("Missing attempt friendly ID", { status: this.#status }); - - this.#emitUnrecoverableError( - "NoAttemptId", - "Attempt ID not set while resuming from paused state" - ); - - return; - } - - if (!this.attemptNumber) { - logger.error("Missing attempt number", { status: this.#status }); - - this.#emitUnrecoverableError( - "NoAttemptNumber", - "Attempt number not set while resuming from paused state" - ); - - return; - } - - socket.emit("READY_FOR_RESUME", { - version: "v2", - attemptFriendlyId: this.attemptFriendlyId, - attemptNumber: this.attemptNumber, - type: this.nextResumeAfter, - }); - + await this.#readyForResume(); return; } @@ -1293,6 +1359,9 @@ class ProdWorker { attemptNumber: this.attemptNumber, waitForTaskReplay: this.waitForTaskReplay, waitForBatchReplay: this.waitForBatchReplay, + readyForLazyAttemptReplay: this.readyForLazyAttemptReplay, + durationResumeFallback: this.durationResumeFallback, + readyForResumeReplay: this.readyForResumeReplay, }; }