From 926a6ff9a6b5162e52550dca19fcc349ded51888 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Sun, 3 Nov 2024 11:52:27 +0000 Subject: [PATCH 1/3] replay ready for resume --- .../src/entryPoints/deploy-run-controller.ts | 143 +++++++++++++----- 1 file changed, 102 insertions(+), 41 deletions(-) diff --git a/packages/cli-v3/src/entryPoints/deploy-run-controller.ts b/packages/cli-v3/src/entryPoints/deploy-run-controller.ts index 6d449a8b69..098f8344d2 100644 --- a/packages/cli-v3/src/entryPoints/deploy-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/deploy-run-controller.ts @@ -97,6 +97,12 @@ class ProdWorker { idempotencyKey: string; } | undefined; + private readyForResumeReplay: + | { + idempotencyKey: string; + type: WaitReason; + } + | undefined; #httpPort: number; #httpServer: ReturnType; @@ -405,6 +411,7 @@ class ProdWorker { this.waitForPostStart = false; this.durationResumeFallback = undefined; + this.readyForResumeReplay = undefined; this._taskRunProcess?.waitCompletedNotification(); } @@ -412,6 +419,8 @@ class ProdWorker { async #readyForLazyAttempt() { const idempotencyKey = randomUUID(); + logger.log("ready for lazy attempt", { idempotencyKey }); + this.readyForLazyAttemptReplay = { idempotencyKey, }; @@ -420,7 +429,7 @@ class ProdWorker { // ..but we also have to be fast to avoid failing the task due to missing heartbeat for await (const { delay, retry } of defaultBackoff.min(10).maxRetries(7)) { if (retry > 0) { - logger.log("retrying ready for lazy attempt", { retry }); + logger.log("retrying ready for lazy attempt", { retry, idempotencyKey }); } this.#coordinatorSocket.socket.emit("READY_FOR_LAZY_ATTEMPT", { @@ -453,6 +462,93 @@ class ProdWorker { this.#failRun(this.runId, "Failed to receive execute request in a reasonable time"); } + async #readyForResume() { + const idempotencyKey = randomUUID(); + + logger.log("readyForResume()", { + nextResumeAfter: this.nextResumeAfter, + attemptFriendlyId: this.attemptFriendlyId, + attemptNumber: this.attemptNumber, + idempotencyKey, + }); + + if (!this.nextResumeAfter) { + logger.error("Missing next resume reason", { status: this.#status }); + + this.#emitUnrecoverableError( + "NoNextResume", + "Next resume reason not set while resuming from paused state" + ); + + return; + } + + if (!this.attemptFriendlyId) { + logger.error("Missing attempt friendly ID", { status: this.#status }); + + this.#emitUnrecoverableError( + "NoAttemptId", + "Attempt ID not set while resuming from paused state" + ); + + return; + } + + if (!this.attemptNumber) { + logger.error("Missing attempt number", { status: this.#status }); + + this.#emitUnrecoverableError( + "NoAttemptNumber", + "Attempt number not set while resuming from paused state" + ); + + return; + } + + this.readyForResumeReplay = { + idempotencyKey, + type: this.nextResumeAfter, + }; + + const lockedMetadata = { + attemptFriendlyId: this.attemptFriendlyId, + attemptNumber: this.attemptNumber, + type: this.nextResumeAfter, + }; + + // Retry if we don't receive RESUME_AFTER_DEPENDENCY or RESUME_AFTER_DURATION in a reasonable time + // ..but we also have to be fast to avoid failing the task due to missing heartbeat + for await (const { delay, retry } of defaultBackoff.min(10).maxRetries(7)) { + if (retry > 0) { + logger.log("retrying ready for resume", { retry, idempotencyKey }); + } + + this.#coordinatorSocket.socket.emit("READY_FOR_RESUME", { + version: "v2", + ...lockedMetadata, + }); + + await timeout(delay.milliseconds); + + if (!this.readyForResumeReplay) { + logger.log("replay ready for resume cancelled, discarding", { + idempotencyKey, + }); + + return; + } + + if (idempotencyKey !== this.readyForResumeReplay.idempotencyKey) { + logger.log("replay ready for resume idempotency key mismatch, discarding", { + idempotencyKey, + newIdempotencyKey: this.readyForResumeReplay.idempotencyKey, + }); + + return; + } + } + } + #readyForCheckpoint() { this.#coordinatorSocket.socket.emit("READY_FOR_CHECKPOINT", { version: "v1" }); } @@ -630,6 +726,7 @@ class ProdWorker { this.paused = false; this.nextResumeAfter = undefined; this.waitForPostStart = false; + this.readyForResumeReplay = undefined; for (let i = 0; i < completions.length; i++) { const completion = completions[i]; @@ -845,46 +942,7 @@ class ProdWorker { } if (this.paused) { - if (!this.nextResumeAfter) { - logger.error("Missing next resume reason", { status: this.#status }); - - this.#emitUnrecoverableError( - "NoNextResume", - "Next resume reason not set while resuming from paused state" - ); - - return; - } - - if (!this.attemptFriendlyId) { - logger.error("Missing attempt friendly ID", { status: this.#status }); - - this.#emitUnrecoverableError( - "NoAttemptId", - "Attempt ID not set while resuming from paused state" - ); - - return; - } - - if (!this.attemptNumber) { - logger.error("Missing attempt number", { status: this.#status }); - - this.#emitUnrecoverableError( - "NoAttemptNumber", - "Attempt number not set while resuming from paused state" - ); - - return; - } - - socket.emit("READY_FOR_RESUME", { - version: "v2", - attemptFriendlyId: this.attemptFriendlyId, - attemptNumber: this.attemptNumber, - type: this.nextResumeAfter, - }); - + await this.#readyForResume(); return; } @@ -1293,6 +1351,9 @@ class ProdWorker { attemptNumber: this.attemptNumber, waitForTaskReplay: this.waitForTaskReplay, waitForBatchReplay: this.waitForBatchReplay, + readyForLazyAttemptReplay: this.readyForLazyAttemptReplay, + durationResumeFallback: this.durationResumeFallback, + readyForResumeReplay: this.readyForResumeReplay, }; } From 30f3d7f92306119ca6b02a280ed260c0530cda34 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Sun, 3 Nov 2024 11:53:12 +0000 Subject: [PATCH 2/3] clear replay state between attempts --- packages/cli-v3/src/entryPoints/deploy-run-controller.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/packages/cli-v3/src/entryPoints/deploy-run-controller.ts b/packages/cli-v3/src/entryPoints/deploy-run-controller.ts index 098f8344d2..c06ff26ecd 100644 --- a/packages/cli-v3/src/entryPoints/deploy-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/deploy-run-controller.ts @@ -371,10 +371,18 @@ class ProdWorker { async #prepareForRetry() { // Clear state for retrying this.paused = false; + this.nextResumeAfter = undefined; this.waitForPostStart = false; this.executing = false; this.attemptFriendlyId = undefined; this.attemptNumber = undefined; + + // Clear replay state + this.waitForTaskReplay = undefined; + this.waitForBatchReplay = undefined; + this.readyForLazyAttemptReplay = undefined; + this.durationResumeFallback = undefined; + this.readyForResumeReplay = undefined; } // MARK: CHECKPOINT PREP From 58113393b08c4cb439165531865f35d2c2a67c17 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Sun, 3 Nov 2024 11:56:04 +0000 Subject: [PATCH 3/3] add changeset --- .changeset/sixty-donuts-fail.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/sixty-donuts-fail.md diff --git a/.changeset/sixty-donuts-fail.md b/.changeset/sixty-donuts-fail.md new file mode 100644 index 0000000000..c8828e3d05 --- /dev/null +++ b/.changeset/sixty-donuts-fail.md @@ -0,0 +1,5 @@ +--- +"trigger.dev": patch +--- + +Improve resume reliability by replaying ready signal of restored workers