diff --git a/.changeset/tidy-books-smell.md b/.changeset/tidy-books-smell.md new file mode 100644 index 0000000000..b8ecf87f55 --- /dev/null +++ b/.changeset/tidy-books-smell.md @@ -0,0 +1,8 @@ +--- +"trigger.dev": patch +"@trigger.dev/core": patch +--- + +- Fix polling interval reset bug that could create duplicate intervals +- Protect against unexpected attempt number changes +- Prevent run execution zombies after warm starts \ No newline at end of file diff --git a/packages/cli-v3/src/entryPoints/managed/controller.ts b/packages/cli-v3/src/entryPoints/managed/controller.ts index d6685e8c84..f73c313d72 100644 --- a/packages/cli-v3/src/entryPoints/managed/controller.ts +++ b/packages/cli-v3/src/entryPoints/managed/controller.ts @@ -178,7 +178,17 @@ export class ManagedRunController { } const execution = async () => { - if (!this.currentExecution || !this.currentExecution.isPreparedForNextRun) { + // If we have an existing execution that isn't prepared for the next run, kill it + if (this.currentExecution && !this.currentExecution.canExecute) { + this.sendDebugLog({ + runId: runFriendlyId, + message: "killing existing execution before starting new run", + }); + await this.currentExecution.kill().catch(() => {}); + this.currentExecution = null; + } + + if (!this.currentExecution || !this.currentExecution.canExecute) { this.currentExecution = new RunExecution({ workerManifest: this.workerManifest, env: this.env, @@ -267,11 +277,12 @@ export class ManagedRunController { if (this.currentExecution?.taskRunEnv) { this.sendDebugLog({ runId: this.runFriendlyId, - message: "waitForNextRun: eagerly recreating task run process", + message: "waitForNextRun: eagerly creating fresh execution for next run", }); const previousTaskRunEnv = this.currentExecution.taskRunEnv; + // Create a fresh execution for the next run this.currentExecution = new RunExecution({ workerManifest: this.workerManifest, env: this.env, @@ -486,10 +497,32 @@ export class ManagedRunController { }); socket.on("disconnect", (reason, description) => { + const parseDescription = (): + | { + description: string; + context?: string; + } + | undefined => { + if (!description) { + return undefined; + } + + if (description instanceof Error) { + return { + description: description.toString(), + }; + } + + return { + description: description.description, + context: description.context ? String(description.context) : undefined, + }; + }; + this.sendDebugLog({ runId: this.runFriendlyId, message: "Socket disconnected from supervisor", - properties: { reason, description: description?.toString() }, + properties: { reason, ...parseDescription() }, }); }); diff --git a/packages/cli-v3/src/entryPoints/managed/execution.ts b/packages/cli-v3/src/entryPoints/managed/execution.ts index 58d41443dc..927ed409fe 100644 --- a/packages/cli-v3/src/entryPoints/managed/execution.ts +++ b/packages/cli-v3/src/entryPoints/managed/execution.ts @@ -51,6 +51,7 @@ export class RunExecution { private _runFriendlyId?: string; private currentSnapshotId?: string; + private currentAttemptNumber?: number; private currentTaskRunEnv?: Record; private dequeuedAt?: Date; @@ -65,6 +66,7 @@ export class RunExecution { private snapshotPoller?: RunExecutionSnapshotPoller; private lastHeartbeat?: Date; + private isShuttingDown = false; constructor(opts: RunExecutionOptions) { this.id = randomBytes(4).toString("hex"); @@ -86,10 +88,6 @@ export class RunExecution { throw new Error("prepareForExecution called after process was already created"); } - if (this.isPreparedForNextRun) { - throw new Error("prepareForExecution called after execution was already prepared"); - } - this.taskRunProcess = this.createTaskRunProcess({ envVars: opts.taskRunEnv, isWarmStart: true, @@ -150,9 +148,14 @@ export class RunExecution { } /** - * Returns true if the execution has been prepared with task run env. + * Returns true if no run has been started yet and the process is prepared for the next run. */ - get isPreparedForNextRun(): boolean { + get canExecute(): boolean { + // If we've ever had a run ID, this execution can't be reused + if (this._runFriendlyId) { + return false; + } + return !!this.taskRunProcess?.isPreparedForNextRun; } @@ -161,6 +164,11 @@ export class RunExecution { * or when the snapshot poller detects a change */ public async handleSnapshotChange(runData: RunExecutionData): Promise { + if (this.isShuttingDown) { + this.sendDebugLog("handleSnapshotChange: shutting down, skipping"); + return; + } + const { run, snapshot, completedWaitpoints } = runData; const snapshotMetadata = { @@ -191,8 +199,6 @@ export class RunExecution { return; } - this.sendDebugLog(`enqueued snapshot change: ${snapshot.executionStatus}`, snapshotMetadata); - this.snapshotChangeQueue.push(runData); await this.processSnapshotChangeQueue(); } @@ -240,11 +246,16 @@ export class RunExecution { } if (snapshot.friendlyId === this.currentSnapshotId) { - this.sendDebugLog("handleSnapshotChange: snapshot not changed", snapshotMetadata); return; } - this.sendDebugLog(`snapshot change: ${snapshot.executionStatus}`, snapshotMetadata); + if (this.currentAttemptNumber && this.currentAttemptNumber !== run.attemptNumber) { + this.sendDebugLog("ERROR: attempt number mismatch", snapshotMetadata); + await this.taskRunProcess?.suspend(); + return; + } + + this.sendDebugLog(`snapshot has changed to: ${snapshot.executionStatus}`, snapshotMetadata); // Reset the snapshot poll interval so we don't do unnecessary work this.snapshotPoller?.resetCurrentInterval(); @@ -456,6 +467,16 @@ export class RunExecution { // A snapshot was just created, so update the snapshot ID this.currentSnapshotId = start.data.snapshot.friendlyId; + // Also set or update the attempt number - we do this to detect illegal attempt number changes, e.g. from stalled runners coming back online + const attemptNumber = start.data.run.attemptNumber; + if (attemptNumber && attemptNumber > 0) { + this.currentAttemptNumber = attemptNumber; + } else { + this.sendDebugLog("ERROR: invalid attempt number returned from start attempt", { + attemptNumber: String(attemptNumber), + }); + } + const metrics = this.measureExecutionMetrics({ attemptCreatedAt: attemptStartedAt, dequeuedAt: this.dequeuedAt?.getTime(), @@ -597,8 +618,18 @@ export class RunExecution { metrics: TaskRunExecutionMetrics; isWarmStart?: boolean; }) { + // For immediate retries, we need to ensure the task run process is prepared for the next attempt + if ( + this.runFriendlyId && + this.taskRunProcess && + !this.taskRunProcess.isPreparedForNextAttempt + ) { + this.sendDebugLog("killing existing task run process before executing next attempt"); + await this.kill().catch(() => {}); + } + // To skip this step and eagerly create the task run process, run prepareForExecution first - if (!this.taskRunProcess || !this.isPreparedForNextRun) { + if (!this.taskRunProcess || !this.taskRunProcess.isPreparedForNextRun) { this.taskRunProcess = this.createTaskRunProcess({ envVars, isWarmStart }); } @@ -655,11 +686,15 @@ export class RunExecution { } public exit() { - if (this.isPreparedForNextRun) { + if (this.taskRunProcess?.isPreparedForNextRun) { this.taskRunProcess?.forceExit(); } } + public async kill() { + await this.taskRunProcess?.kill("SIGKILL"); + } + private async complete({ completion }: { completion: TaskRunExecutionResult }): Promise { if (!this.runFriendlyId || !this.currentSnapshotId) { throw new Error("Cannot complete run: missing run or snapshot ID"); @@ -897,7 +932,7 @@ export class RunExecution { this.lastHeartbeat = new Date(); } - sendDebugLog( + private sendDebugLog( message: string, properties?: SendDebugLogOptions["properties"], runIdOverride?: string @@ -958,6 +993,11 @@ export class RunExecution { } private stopServices() { + if (this.isShuttingDown) { + return; + } + + this.isShuttingDown = true; this.snapshotPoller?.stop(); this.taskRunProcess?.onTaskRunHeartbeat.detach(); } diff --git a/packages/cli-v3/src/entryPoints/managed/poller.ts b/packages/cli-v3/src/entryPoints/managed/poller.ts index 2decd401ee..814833846e 100644 --- a/packages/cli-v3/src/entryPoints/managed/poller.ts +++ b/packages/cli-v3/src/entryPoints/managed/poller.ts @@ -1,5 +1,5 @@ import { WorkloadHttpClient } from "@trigger.dev/core/v3/runEngineWorker"; -import { RunLogger } from "./logger.js"; +import { RunLogger, SendDebugLogOptions } from "./logger.js"; import { IntervalService, RunExecutionData } from "@trigger.dev/core/v3"; export type RunExecutionSnapshotPollerOptions = { @@ -14,89 +14,70 @@ export type RunExecutionSnapshotPollerOptions = { export class RunExecutionSnapshotPoller { private runFriendlyId: string; private snapshotFriendlyId: string; + private enabled: boolean; private readonly httpClient: WorkloadHttpClient; private readonly logger: RunLogger; - private readonly snapshotPollIntervalMs: number; private readonly handleSnapshotChange: (runData: RunExecutionData) => Promise; private readonly poller: IntervalService; constructor(opts: RunExecutionSnapshotPollerOptions) { + this.enabled = false; + this.runFriendlyId = opts.runFriendlyId; this.snapshotFriendlyId = opts.snapshotFriendlyId; this.httpClient = opts.httpClient; this.logger = opts.logger; - this.snapshotPollIntervalMs = opts.snapshotPollIntervalSeconds * 1000; this.handleSnapshotChange = opts.handleSnapshotChange; - this.logger.sendDebugLog({ - runId: this.runFriendlyId, - message: "RunExecutionSnapshotPoller", - properties: { - runFriendlyId: this.runFriendlyId, - snapshotFriendlyId: this.snapshotFriendlyId, - snapshotPollIntervalSeconds: opts.snapshotPollIntervalSeconds, - }, - }); + const intervalMs = opts.snapshotPollIntervalSeconds * 1000; this.poller = new IntervalService({ onInterval: async () => { - if (!this.runFriendlyId) { - this.logger.sendDebugLog({ - runId: this.runFriendlyId, - message: "Skipping snapshot poll, no run ID", - }); + if (!this.enabled) { + this.sendDebugLog("poller disabled, skipping snapshot change handler (pre)"); return; } - this.logger.sendDebugLog({ - runId: this.runFriendlyId, - message: "Polling for latest snapshot", - }); - - this.logger.sendDebugLog({ - runId: this.runFriendlyId, - message: `snapshot poll: started`, - properties: { - snapshotId: this.snapshotFriendlyId, - }, - }); + this.sendDebugLog("polling for latest snapshot"); const response = await this.httpClient.getRunExecutionData(this.runFriendlyId); if (!response.success) { - this.logger.sendDebugLog({ - runId: this.runFriendlyId, - message: "Snapshot poll failed", - properties: { - error: response.error, - }, - }); - - this.logger.sendDebugLog({ - runId: this.runFriendlyId, - message: `snapshot poll: failed`, - properties: { - snapshotId: this.snapshotFriendlyId, - error: response.error, - }, - }); + this.sendDebugLog("failed to get run execution data", { error: response.error }); + return; + } + if (!this.enabled) { + this.sendDebugLog("poller disabled, skipping snapshot change handler (post)"); return; } await this.handleSnapshotChange(response.data.execution); }, - intervalMs: this.snapshotPollIntervalMs, + intervalMs, leadingEdge: false, onError: async (error) => { - this.logger.sendDebugLog({ - runId: this.runFriendlyId, - message: "Failed to poll for snapshot", - properties: { error: error instanceof Error ? error.message : String(error) }, + this.sendDebugLog("failed to poll for snapshot", { + error: error instanceof Error ? error.message : String(error), }); }, }); + + this.sendDebugLog("created"); + } + + private sendDebugLog(message: string, properties?: SendDebugLogOptions["properties"]) { + this.logger.sendDebugLog({ + runId: this.runFriendlyId, + message: `[poller] ${message}`, + properties: { + ...properties, + runId: this.runFriendlyId, + snapshotId: this.snapshotFriendlyId, + pollIntervalMs: this.poller.intervalMs, + }, + }); } resetCurrentInterval() { @@ -112,10 +93,22 @@ export class RunExecutionSnapshotPoller { } start() { + if (this.enabled) { + this.sendDebugLog("already started"); + return; + } + + this.enabled = true; this.poller.start(); } stop() { + if (!this.enabled) { + this.sendDebugLog("already stopped"); + return; + } + + this.enabled = false; this.poller.stop(); } } diff --git a/packages/cli-v3/src/executions/taskRunProcess.ts b/packages/cli-v3/src/executions/taskRunProcess.ts index cb24dac9f6..ae95ecb109 100644 --- a/packages/cli-v3/src/executions/taskRunProcess.ts +++ b/packages/cli-v3/src/executions/taskRunProcess.ts @@ -89,15 +89,21 @@ export class TaskRunProcess { public onWait: Evt = new Evt(); private _isPreparedForNextRun: boolean = false; + private _isPreparedForNextAttempt: boolean = false; constructor(public readonly options: TaskRunProcessOptions) { this._isPreparedForNextRun = true; + this._isPreparedForNextAttempt = true; } get isPreparedForNextRun() { return this._isPreparedForNextRun; } + get isPreparedForNextAttempt() { + return this._isPreparedForNextAttempt; + } + async cancel() { this._isPreparedForNextRun = false; this._isBeingCancelled = true; @@ -223,6 +229,7 @@ export class TaskRunProcess { isWarmStart?: boolean ): Promise { this._isPreparedForNextRun = false; + this._isPreparedForNextAttempt = false; let resolver: (value: TaskRunExecutionResult) => void; let rejecter: (err?: any) => void; @@ -266,6 +273,7 @@ export class TaskRunProcess { const result = await promise; this._currentExecution = undefined; + this._isPreparedForNextAttempt = true; return result; } diff --git a/packages/core/src/v3/index.ts b/packages/core/src/v3/index.ts index 8877393dca..3bd1fc4547 100644 --- a/packages/core/src/v3/index.ts +++ b/packages/core/src/v3/index.ts @@ -68,7 +68,6 @@ export { export * from "./utils/imageRef.js"; export * from "./utils/interval.js"; -export * from "./utils/heartbeat.js"; export * from "./config.js"; export { diff --git a/packages/core/src/v3/utils/heartbeat.ts b/packages/core/src/v3/utils/heartbeat.ts deleted file mode 100644 index c9bb0d97ed..0000000000 --- a/packages/core/src/v3/utils/heartbeat.ts +++ /dev/null @@ -1,96 +0,0 @@ -type HeartbeatServiceOptions = { - heartbeat: () => Promise; - intervalMs?: number; - leadingEdge?: boolean; - onError?: (error: unknown) => Promise; -}; - -/** - * @deprecated Use IntervalService instead - */ -export class HeartbeatService { - private _heartbeat: () => Promise; - private _intervalMs: number; - private _nextHeartbeat: NodeJS.Timeout | undefined; - private _leadingEdge: boolean; - private _isHeartbeating: boolean; - private _onError?: (error: unknown) => Promise; - - constructor(opts: HeartbeatServiceOptions) { - this._heartbeat = opts.heartbeat; - this._intervalMs = opts.intervalMs ?? 45_000; - this._nextHeartbeat = undefined; - this._leadingEdge = opts.leadingEdge ?? false; - this._isHeartbeating = false; - this._onError = opts.onError; - } - - start() { - if (this._isHeartbeating) { - return; - } - - this._isHeartbeating = true; - - if (this._leadingEdge) { - this.#doHeartbeat(); - } else { - this.#scheduleNextHeartbeat(); - } - } - - stop() { - if (!this._isHeartbeating) { - return; - } - - this._isHeartbeating = false; - this.#clearNextHeartbeat(); - } - - resetCurrentInterval() { - if (!this._isHeartbeating) { - return; - } - - this.#clearNextHeartbeat(); - this.#scheduleNextHeartbeat(); - } - - updateInterval(intervalMs: number) { - this._intervalMs = intervalMs; - this.resetCurrentInterval(); - } - - #doHeartbeat = async () => { - this.#clearNextHeartbeat(); - - if (!this._isHeartbeating) { - return; - } - - try { - await this._heartbeat(); - } catch (error) { - if (this._onError) { - try { - await this._onError(error); - } catch (error) { - console.error("Error handling heartbeat error", error); - } - } - } - - this.#scheduleNextHeartbeat(); - }; - - #clearNextHeartbeat() { - if (this._nextHeartbeat) { - clearTimeout(this._nextHeartbeat); - } - } - - #scheduleNextHeartbeat() { - this._nextHeartbeat = setTimeout(this.#doHeartbeat, this._intervalMs); - } -} diff --git a/packages/core/src/v3/utils/interval.ts b/packages/core/src/v3/utils/interval.ts index 59fd0a94cb..9470ae2bb2 100644 --- a/packages/core/src/v3/utils/interval.ts +++ b/packages/core/src/v3/utils/interval.ts @@ -13,6 +13,7 @@ export class IntervalService { private _nextInterval: NodeJS.Timeout | undefined; private _leadingEdge: boolean; private _isEnabled: boolean; + private _isExecuting: boolean; constructor(opts: IntervalServiceOptions) { this._onInterval = opts.onInterval; @@ -22,6 +23,7 @@ export class IntervalService { this._nextInterval = undefined; this._leadingEdge = opts.leadingEdge ?? false; this._isEnabled = false; + this._isExecuting = false; } start() { @@ -52,6 +54,10 @@ export class IntervalService { return; } + if (this._isExecuting) { + return; + } + this.#clearNextInterval(); this.#scheduleNextInterval(); } @@ -61,6 +67,10 @@ export class IntervalService { this.resetCurrentInterval(); } + get intervalMs() { + return this._intervalMs; + } + #doInterval = async () => { this.#clearNextInterval(); @@ -68,6 +78,13 @@ export class IntervalService { return; } + if (this._isExecuting) { + console.error("Interval handler already running, skipping"); + return; + } + + this._isExecuting = true; + try { await this._onInterval(); } catch (error) { @@ -78,9 +95,10 @@ export class IntervalService { console.error("Error during interval error handler", error); } } + } finally { + this.#scheduleNextInterval(); + this._isExecuting = false; } - - this.#scheduleNextInterval(); }; #clearNextInterval() {