From f06e0c26db5651ff30a9fe561259b5cc9771051f Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 10 Feb 2025 12:58:56 +0000 Subject: [PATCH 1/5] =?UTF-8?q?If=20there=E2=80=99s=20a=20heartbeat=20erro?= =?UTF-8?q?r=20and=20no=20attempts=20we=20put=20it=20back=20in=20the=20que?= =?UTF-8?q?ue=20to=20try=20again?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../app/v3/taskRunHeartbeatFailed.server.ts | 48 ++++++++++++------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts b/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts index d722a6f80b..1e54d1db25 100644 --- a/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts +++ b/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts @@ -30,6 +30,11 @@ export class TaskRunHeartbeatFailedService extends BaseService { supportsLazyAttempts: true, }, }, + _count: { + select: { + attempts: true, + }, + }, }, }); @@ -44,23 +49,32 @@ export class TaskRunHeartbeatFailedService extends BaseService { switch (taskRun.status) { case "PENDING": { if (taskRun.lockedAt) { - logger.debug( - "[RequeueTaskRunService] Failing task run because the heartbeat failed and it's PENDING but locked", - { taskRun } - ); - - const service = new FailedTaskRunService(); - - await service.call(taskRun.friendlyId, { - ok: false, - id: taskRun.friendlyId, - retry: undefined, - error: { - type: "INTERNAL_ERROR", - code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT, - message: "Did not receive a heartbeat from the worker in time", - }, - }); + if (taskRun._count.attempts === 0) { + //no attempts, so we can requeue + logger.debug("[RequeueTaskRunService] Requeueing task run, there were no attempts.", { + taskRun, + }); + + await marqs?.nackMessage(taskRun.id); + } else { + logger.debug( + "[RequeueTaskRunService] Failing task run because the heartbeat failed, it's PENDING, locked, and has attempts", + { taskRun } + ); + + const service = new FailedTaskRunService(); + + await service.call(taskRun.friendlyId, { + ok: false, + id: taskRun.friendlyId, + retry: undefined, + error: { + type: "INTERNAL_ERROR", + code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT, + message: "Did not receive a heartbeat from the worker in time", + }, + }); + } } else { logger.debug("[RequeueTaskRunService] Nacking task run", { taskRun }); From 775a078c50d3af5489951e73f14fa26d34ac0a42 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 10 Feb 2025 13:23:33 +0000 Subject: [PATCH 2/5] When nacking, return whether it was put back in the queue or not --- apps/webapp/app/v3/marqs/index.server.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index dfe5a6028d..341f003464 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -638,7 +638,8 @@ export class MarQS { } /** - * Negative acknowledge a message, which will requeue the message + * Negative acknowledge a message, which will requeue the message. + * Returns whether it went back into the queue or not. */ public async nackMessage( messageId: string, @@ -657,7 +658,7 @@ export class MarQS { updates, service: this.name, }); - return; + return false; } const nackCount = await this.#getNackCount(messageId); @@ -676,7 +677,7 @@ export class MarQS { // If we have reached the maximum nack count, we will ack the message await this.acknowledgeMessage(messageId, "maximum nack count reached"); - return; + return false; } span.setAttributes({ @@ -705,6 +706,8 @@ export class MarQS { }); await this.options.subscriber?.messageNacked(message); + + return true; }, { kind: SpanKind.CONSUMER, From bec58eb034e9f2bbda0be93d35f062573513a865 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 10 Feb 2025 13:23:43 +0000 Subject: [PATCH 3/5] Try and nack, if it fails then fail the run --- .../app/v3/taskRunHeartbeatFailed.server.ts | 130 +++++++++++------- 1 file changed, 79 insertions(+), 51 deletions(-) diff --git a/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts b/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts index 1e54d1db25..bca40b001b 100644 --- a/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts +++ b/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts @@ -39,85 +39,113 @@ export class TaskRunHeartbeatFailedService extends BaseService { }); if (!taskRun) { - logger.error("[RequeueTaskRunService] Task run not found", { + logger.error("[TaskRunHeartbeatFailedService] Task run not found", { runId, }); return; } + const service = new FailedTaskRunService(); + switch (taskRun.status) { case "PENDING": { - if (taskRun.lockedAt) { - if (taskRun._count.attempts === 0) { - //no attempts, so we can requeue - logger.debug("[RequeueTaskRunService] Requeueing task run, there were no attempts.", { + const backInQueue = await marqs?.nackMessage(taskRun.id); + + if (backInQueue) { + logger.debug( + `[TaskRunHeartbeatFailedService] ${taskRun.status} run is back in the queue run`, + { taskRun, - }); - - await marqs?.nackMessage(taskRun.id); - } else { - logger.debug( - "[RequeueTaskRunService] Failing task run because the heartbeat failed, it's PENDING, locked, and has attempts", - { taskRun } - ); - - const service = new FailedTaskRunService(); - - await service.call(taskRun.friendlyId, { - ok: false, - id: taskRun.friendlyId, - retry: undefined, - error: { - type: "INTERNAL_ERROR", - code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT, - message: "Did not receive a heartbeat from the worker in time", - }, - }); - } + } + ); } else { - logger.debug("[RequeueTaskRunService] Nacking task run", { taskRun }); - - await marqs?.nackMessage(taskRun.id); + logger.debug( + `[TaskRunHeartbeatFailedService] ${taskRun.status} run not back in the queue, failing`, + { taskRun } + ); + await service.call(taskRun.friendlyId, { + ok: false, + id: taskRun.friendlyId, + retry: undefined, + error: { + type: "INTERNAL_ERROR", + code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT, + message: "Did not receive a heartbeat from the worker in time", + }, + }); } break; } case "EXECUTING": case "RETRYING_AFTER_FAILURE": { - logger.debug("[RequeueTaskRunService] Failing task run", { taskRun }); - - const service = new FailedTaskRunService(); - - await service.call(taskRun.friendlyId, { - ok: false, - id: taskRun.friendlyId, - retry: undefined, - error: { - type: "INTERNAL_ERROR", - code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT, - message: "Did not receive a heartbeat from the worker in time", - }, - }); + const backInQueue = await marqs?.nackMessage(taskRun.id); + + if (backInQueue) { + logger.debug( + `[TaskRunHeartbeatFailedService] ${taskRun.status} run is back in the queue run`, + { + taskRun, + } + ); + } else { + logger.debug( + `[TaskRunHeartbeatFailedService] ${taskRun.status} run not back in the queue, failing`, + { taskRun } + ); + await service.call(taskRun.friendlyId, { + ok: false, + id: taskRun.friendlyId, + retry: undefined, + error: { + type: "INTERNAL_ERROR", + code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT, + message: "Did not receive a heartbeat from the worker in time", + }, + }); + } break; } case "DELAYED": case "WAITING_FOR_DEPLOY": { - logger.debug("[RequeueTaskRunService] Removing task run from queue", { taskRun }); + logger.debug("[TaskRunHeartbeatFailedService] Removing task run from queue", { taskRun }); await marqs?.acknowledgeMessage( taskRun.id, - "Run is either DELAYED or WAITING_FOR_DEPLOY so we cannot requeue it in RequeueTaskRunService" + "Run is either DELAYED or WAITING_FOR_DEPLOY so we cannot requeue it in TaskRunHeartbeatFailedService" ); break; } case "WAITING_TO_RESUME": case "PAUSED": { - logger.debug("[RequeueTaskRunService] Requeueing task run", { taskRun }); + const backInQueue = await marqs?.nackMessage(taskRun.id); - await marqs?.nackMessage(taskRun.id); + if (backInQueue) { + logger.debug( + `[TaskRunHeartbeatFailedService] ${taskRun.status} run is back in the queue run`, + { + taskRun, + } + ); + } else { + logger.debug( + `[TaskRunHeartbeatFailedService] ${taskRun.status} run not back in the queue, failing`, + { taskRun } + ); + await service.call(taskRun.friendlyId, { + ok: false, + id: taskRun.friendlyId, + retry: undefined, + error: { + type: "INTERNAL_ERROR", + code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT, + message: "Did not receive a heartbeat from the worker in time", + }, + }); + } break; } @@ -129,11 +157,11 @@ export class TaskRunHeartbeatFailedService extends BaseService { case "EXPIRED": case "TIMED_OUT": case "CANCELED": { - logger.debug("[RequeueTaskRunService] Task run is completed", { taskRun }); + logger.debug("[TaskRunHeartbeatFailedService] Task run is completed", { taskRun }); await marqs?.acknowledgeMessage( taskRun.id, - "Task run is already completed in RequeueTaskRunService" + "Task run is already completed in TaskRunHeartbeatFailedService" ); try { @@ -149,7 +177,7 @@ export class TaskRunHeartbeatFailedService extends BaseService { delayInMs: taskRun.lockedToVersion?.supportsLazyAttempts ? 5_000 : undefined, }); } catch (error) { - logger.error("[RequeueTaskRunService] Error signaling run cancellation", { + logger.error("[TaskRunHeartbeatFailedService] Error signaling run cancellation", { runId: taskRun.id, error: error instanceof Error ? error.message : error, }); From adacc885672b0dcf3d85153acfa4ab2d0ccfc3a3 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 10 Feb 2025 13:33:06 +0000 Subject: [PATCH 4/5] Consolidated switch statement --- .../app/v3/taskRunHeartbeatFailed.server.ts | 64 ++----------------- 1 file changed, 4 insertions(+), 60 deletions(-) diff --git a/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts b/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts index bca40b001b..ae6b4ca375 100644 --- a/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts +++ b/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts @@ -49,37 +49,11 @@ export class TaskRunHeartbeatFailedService extends BaseService { const service = new FailedTaskRunService(); switch (taskRun.status) { - case "PENDING": { - const backInQueue = await marqs?.nackMessage(taskRun.id); - - if (backInQueue) { - logger.debug( - `[TaskRunHeartbeatFailedService] ${taskRun.status} run is back in the queue run`, - { - taskRun, - } - ); - } else { - logger.debug( - `[TaskRunHeartbeatFailedService] ${taskRun.status} run not back in the queue, failing`, - { taskRun } - ); - await service.call(taskRun.friendlyId, { - ok: false, - id: taskRun.friendlyId, - retry: undefined, - error: { - type: "INTERNAL_ERROR", - code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT, - message: "Did not receive a heartbeat from the worker in time", - }, - }); - } - - break; - } + case "PENDING": case "EXECUTING": - case "RETRYING_AFTER_FAILURE": { + case "RETRYING_AFTER_FAILURE": + case "WAITING_TO_RESUME": + case "PAUSED": { const backInQueue = await marqs?.nackMessage(taskRun.id); if (backInQueue) { @@ -119,36 +93,6 @@ export class TaskRunHeartbeatFailedService extends BaseService { break; } - case "WAITING_TO_RESUME": - case "PAUSED": { - const backInQueue = await marqs?.nackMessage(taskRun.id); - - if (backInQueue) { - logger.debug( - `[TaskRunHeartbeatFailedService] ${taskRun.status} run is back in the queue run`, - { - taskRun, - } - ); - } else { - logger.debug( - `[TaskRunHeartbeatFailedService] ${taskRun.status} run not back in the queue, failing`, - { taskRun } - ); - await service.call(taskRun.friendlyId, { - ok: false, - id: taskRun.friendlyId, - retry: undefined, - error: { - type: "INTERNAL_ERROR", - code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT, - message: "Did not receive a heartbeat from the worker in time", - }, - }); - } - - break; - } case "SYSTEM_FAILURE": case "INTERRUPTED": case "CRASHED": From 30aea1fb8097cef5a13040574b6320cd3479c00f Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 10 Feb 2025 13:37:10 +0000 Subject: [PATCH 5/5] Fail executing/retrying runs --- .../app/v3/taskRunHeartbeatFailed.server.ts | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts b/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts index ae6b4ca375..fa564c06b8 100644 --- a/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts +++ b/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts @@ -50,8 +50,6 @@ export class TaskRunHeartbeatFailedService extends BaseService { switch (taskRun.status) { case "PENDING": - case "EXECUTING": - case "RETRYING_AFTER_FAILURE": case "WAITING_TO_RESUME": case "PAUSED": { const backInQueue = await marqs?.nackMessage(taskRun.id); @@ -82,9 +80,29 @@ export class TaskRunHeartbeatFailedService extends BaseService { break; } + case "EXECUTING": + case "RETRYING_AFTER_FAILURE": { + logger.debug(`[RequeueTaskRunService] ${taskRun.status} failing task run`, { taskRun }); + + await service.call(taskRun.friendlyId, { + ok: false, + id: taskRun.friendlyId, + retry: undefined, + error: { + type: "INTERNAL_ERROR", + code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT, + message: "Did not receive a heartbeat from the worker in time", + }, + }); + + break; + } case "DELAYED": case "WAITING_FOR_DEPLOY": { - logger.debug("[TaskRunHeartbeatFailedService] Removing task run from queue", { taskRun }); + logger.debug( + `[TaskRunHeartbeatFailedService] ${taskRun.status} Removing task run from queue`, + { taskRun } + ); await marqs?.acknowledgeMessage( taskRun.id,