Skip to content

Commit 4dd42cd

Browse files
authored
Retry heartbeat timeouts by putting back in the queue (#1689)
* If there’s a heartbeat error and no attempts we put it back in the queue to try again * When nacking, return whether it was put back in the queue or not * Try and nack, if it fails then fail the run * Consolidated switch statement * Fail executing/retrying runs
1 parent bc7d445 commit 4dd42cd

File tree

2 files changed

+37
-30
lines changed

2 files changed

+37
-30
lines changed

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,8 @@ export class MarQS {
638638
}
639639

640640
/**
641-
* Negative acknowledge a message, which will requeue the message
641+
* Negative acknowledge a message, which will requeue the message.
642+
* Returns whether it went back into the queue or not.
642643
*/
643644
public async nackMessage(
644645
messageId: string,
@@ -657,7 +658,7 @@ export class MarQS {
657658
updates,
658659
service: this.name,
659660
});
660-
return;
661+
return false;
661662
}
662663

663664
const nackCount = await this.#getNackCount(messageId);
@@ -676,7 +677,7 @@ export class MarQS {
676677

677678
// If we have reached the maximum nack count, we will ack the message
678679
await this.acknowledgeMessage(messageId, "maximum nack count reached");
679-
return;
680+
return false;
680681
}
681682

682683
span.setAttributes({
@@ -705,6 +706,8 @@ export class MarQS {
705706
});
706707

707708
await this.options.subscriber?.messageNacked(message);
709+
710+
return true;
708711
},
709712
{
710713
kind: SpanKind.CONSUMER,

apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -30,27 +30,42 @@ export class TaskRunHeartbeatFailedService extends BaseService {
3030
supportsLazyAttempts: true,
3131
},
3232
},
33+
_count: {
34+
select: {
35+
attempts: true,
36+
},
37+
},
3338
},
3439
});
3540

3641
if (!taskRun) {
37-
logger.error("[RequeueTaskRunService] Task run not found", {
42+
logger.error("[TaskRunHeartbeatFailedService] Task run not found", {
3843
runId,
3944
});
4045

4146
return;
4247
}
4348

49+
const service = new FailedTaskRunService();
50+
4451
switch (taskRun.status) {
45-
case "PENDING": {
46-
if (taskRun.lockedAt) {
52+
case "PENDING":
53+
case "WAITING_TO_RESUME":
54+
case "PAUSED": {
55+
const backInQueue = await marqs?.nackMessage(taskRun.id);
56+
57+
if (backInQueue) {
4758
logger.debug(
48-
"[RequeueTaskRunService] Failing task run because the heartbeat failed and it's PENDING but locked",
59+
`[TaskRunHeartbeatFailedService] ${taskRun.status} run is back in the queue run`,
60+
{
61+
taskRun,
62+
}
63+
);
64+
} else {
65+
logger.debug(
66+
`[TaskRunHeartbeatFailedService] ${taskRun.status} run not back in the queue, failing`,
4967
{ taskRun }
5068
);
51-
52-
const service = new FailedTaskRunService();
53-
5469
await service.call(taskRun.friendlyId, {
5570
ok: false,
5671
id: taskRun.friendlyId,
@@ -61,19 +76,13 @@ export class TaskRunHeartbeatFailedService extends BaseService {
6176
message: "Did not receive a heartbeat from the worker in time",
6277
},
6378
});
64-
} else {
65-
logger.debug("[RequeueTaskRunService] Nacking task run", { taskRun });
66-
67-
await marqs?.nackMessage(taskRun.id);
6879
}
6980

7081
break;
7182
}
7283
case "EXECUTING":
7384
case "RETRYING_AFTER_FAILURE": {
74-
logger.debug("[RequeueTaskRunService] Failing task run", { taskRun });
75-
76-
const service = new FailedTaskRunService();
85+
logger.debug(`[RequeueTaskRunService] ${taskRun.status} failing task run`, { taskRun });
7786

7887
await service.call(taskRun.friendlyId, {
7988
ok: false,
@@ -90,23 +99,18 @@ export class TaskRunHeartbeatFailedService extends BaseService {
9099
}
91100
case "DELAYED":
92101
case "WAITING_FOR_DEPLOY": {
93-
logger.debug("[RequeueTaskRunService] Removing task run from queue", { taskRun });
102+
logger.debug(
103+
`[TaskRunHeartbeatFailedService] ${taskRun.status} Removing task run from queue`,
104+
{ taskRun }
105+
);
94106

95107
await marqs?.acknowledgeMessage(
96108
taskRun.id,
97-
"Run is either DELAYED or WAITING_FOR_DEPLOY so we cannot requeue it in RequeueTaskRunService"
109+
"Run is either DELAYED or WAITING_FOR_DEPLOY so we cannot requeue it in TaskRunHeartbeatFailedService"
98110
);
99111

100112
break;
101113
}
102-
case "WAITING_TO_RESUME":
103-
case "PAUSED": {
104-
logger.debug("[RequeueTaskRunService] Requeueing task run", { taskRun });
105-
106-
await marqs?.nackMessage(taskRun.id);
107-
108-
break;
109-
}
110114
case "SYSTEM_FAILURE":
111115
case "INTERRUPTED":
112116
case "CRASHED":
@@ -115,11 +119,11 @@ export class TaskRunHeartbeatFailedService extends BaseService {
115119
case "EXPIRED":
116120
case "TIMED_OUT":
117121
case "CANCELED": {
118-
logger.debug("[RequeueTaskRunService] Task run is completed", { taskRun });
122+
logger.debug("[TaskRunHeartbeatFailedService] Task run is completed", { taskRun });
119123

120124
await marqs?.acknowledgeMessage(
121125
taskRun.id,
122-
"Task run is already completed in RequeueTaskRunService"
126+
"Task run is already completed in TaskRunHeartbeatFailedService"
123127
);
124128

125129
try {
@@ -135,7 +139,7 @@ export class TaskRunHeartbeatFailedService extends BaseService {
135139
delayInMs: taskRun.lockedToVersion?.supportsLazyAttempts ? 5_000 : undefined,
136140
});
137141
} catch (error) {
138-
logger.error("[RequeueTaskRunService] Error signaling run cancellation", {
142+
logger.error("[TaskRunHeartbeatFailedService] Error signaling run cancellation", {
139143
runId: taskRun.id,
140144
error: error instanceof Error ? error.message : error,
141145
});

0 commit comments

Comments
 (0)