Skip to content

Commit efd970a

Browse files
authored
v3: correctly handle triggering tasks prior to deploy (#1019)
* add waiting for deploy status * sort tasks filter * marqs: optional timestamp on enqueue * add new status to consts * detect and execute tasks waiting for deploy * display runs list even without any deploys * ensure we only execute after the run creation tx has completed * ensure correct execution order after waiting for deploy
1 parent 89a5e9f commit efd970a

File tree

13 files changed

+253
-44
lines changed

13 files changed

+253
-44
lines changed

apps/webapp/app/components/runs/v3/RunFilters.tsx

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import { TaskRunStatusCombo, descriptionForTaskRunStatus } from "./TaskRunStatus
2727

2828
export const allTaskRunStatuses = [
2929
"PENDING",
30+
"WAITING_FOR_DEPLOY",
3031
"EXECUTING",
3132
"RETRYING_AFTER_FAILURE",
3233
"WAITING_TO_RESUME",
@@ -228,16 +229,18 @@ export function RunsFilters({ possibleEnvironments, possibleTasks }: RunFiltersP
228229
All tasks
229230
</Paragraph>
230231
</SelectItem>
231-
{possibleTasks.map((task) => (
232-
<SelectItem key={task} value={task}>
233-
<Paragraph
234-
variant="extra-small"
235-
className="pl-0.5 transition group-hover:text-text-bright"
236-
>
237-
{task}
238-
</Paragraph>
239-
</SelectItem>
240-
))}
232+
{possibleTasks
233+
.sort((a, b) => a.localeCompare(b)) // 🔤
234+
.map((task) => (
235+
<SelectItem key={task} value={task}>
236+
<Paragraph
237+
variant="extra-small"
238+
className="pl-0.5 transition group-hover:text-text-bright"
239+
>
240+
{task}
241+
</Paragraph>
242+
</SelectItem>
243+
))}
241244
</SelectContent>
242245
</Select>
243246
</SelectGroup>

apps/webapp/app/components/runs/v3/TaskRunStatus.tsx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import { cn } from "~/utils/cn";
1616

1717
const taskRunStatusDescriptions: Record<TaskRunStatus, string> = {
1818
PENDING: "Task is waiting to be executed",
19+
WAITING_FOR_DEPLOY: "Task needs to be deployed first to start executing",
1920
EXECUTING: "Task is currently being executed",
2021
RETRYING_AFTER_FAILURE: "Task is being reattempted after a failure",
2122
WAITING_TO_RESUME: "Task has been frozen and is waiting to be resumed",
@@ -63,6 +64,8 @@ export function TaskRunStatusIcon({
6364
switch (status) {
6465
case "PENDING":
6566
return <RectangleStackIcon className={cn(runStatusClassNameColor(status), className)} />;
67+
case "WAITING_FOR_DEPLOY":
68+
return <RectangleStackIcon className={cn(runStatusClassNameColor(status), className)} />;
6669
case "EXECUTING":
6770
return <Spinner className={cn(runStatusClassNameColor(status), className)} />;
6871
case "WAITING_TO_RESUME":
@@ -95,6 +98,8 @@ export function runStatusClassNameColor(status: TaskRunStatus): string {
9598
switch (status) {
9699
case "PENDING":
97100
return "text-charcoal-500";
101+
case "WAITING_FOR_DEPLOY":
102+
return "text-amber-500";
98103
case "EXECUTING":
99104
case "RETRYING_AFTER_FAILURE":
100105
return "text-pending";
@@ -125,6 +130,8 @@ export function runStatusTitle(status: TaskRunStatus): string {
125130
switch (status) {
126131
case "PENDING":
127132
return "Queued";
133+
case "WAITING_FOR_DEPLOY":
134+
return "Waiting for deploy";
128135
case "EXECUTING":
129136
return "Executing";
130137
case "WAITING_TO_RESUME":

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.v3.$projectParam.runs._index/route.tsx

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,12 @@ export default function Page() {
6464
<PageTitle title="Runs" />
6565
</NavBar>
6666
<PageBody>
67-
{list.possibleTasks.length === 0 ? (
68-
<CreateFirstTaskInstructions />
69-
) : list.runs.length === 0 && !list.hasFilters ? (
70-
<RunTaskInstructions />
67+
{list.runs.length === 0 && !list.hasFilters ? (
68+
list.possibleTasks.length === 0 ? (
69+
<CreateFirstTaskInstructions />
70+
) : (
71+
<RunTaskInstructions />
72+
)
7173
) : (
7274
<div className={cn("grid h-fit grid-cols-1 gap-4")}>
7375
<div>

apps/webapp/app/services/worker.server.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import { ResumeBatchRunService } from "~/v3/services/resumeBatchRun.server";
3535
import { ResumeTaskDependencyService } from "~/v3/services/resumeTaskDependency.server";
3636
import { TimeoutDeploymentService } from "~/v3/services/timeoutDeployment.server";
3737
import { eventRepository } from "~/v3/eventRepository.server";
38+
import { ExecuteTasksWaitingForDeployService } from "~/v3/services/executeTasksWaitingForDeploy";
3839

3940
const workerCatalog = {
4041
indexEndpoint: z.object({
@@ -128,6 +129,9 @@ const workerCatalog = {
128129
fromStatus: z.string(),
129130
errorMessage: z.string(),
130131
}),
132+
"v3.executeTasksWaitingForDeploy": z.object({
133+
backgroundWorkerId: z.string(),
134+
}),
131135
};
132136

133137
const executionWorkerCatalog = {
@@ -507,6 +511,15 @@ function getWorkerQueue() {
507511
return await service.call(payload.deploymentId, payload.fromStatus, payload.errorMessage);
508512
},
509513
},
514+
"v3.executeTasksWaitingForDeploy": {
515+
priority: 0,
516+
maxAttempts: 5,
517+
handler: async (payload, job) => {
518+
const service = new ExecuteTasksWaitingForDeployService();
519+
520+
return await service.call(payload.backgroundWorkerId);
521+
},
522+
},
510523
},
511524
});
512525
}

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,15 +92,14 @@ export class MarQS {
9292
queue: string,
9393
messageId: string,
9494
messageData: Record<string, unknown>,
95-
concurrencyKey?: string
95+
concurrencyKey?: string,
96+
timestamp?: number
9697
) {
9798
return await this.#trace(
9899
"enqueueMessage",
99100
async (span) => {
100101
const messageQueue = this.keys.queueKey(env, queue, concurrencyKey);
101102

102-
const timestamp = Date.now();
103-
104103
const parentQueue = this.keys.envSharedQueueKey(env);
105104

106105
propagation.inject(context.active(), messageData);
@@ -110,7 +109,7 @@ export class MarQS {
110109
data: messageData,
111110
queue: messageQueue,
112111
concurrencyKey,
113-
timestamp,
112+
timestamp: timestamp ?? Date.now(),
114113
messageId,
115114
parentQueue,
116115
};

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 63 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -242,10 +242,12 @@ export class SharedQueueConsumer {
242242
error: messageBody.error,
243243
});
244244

245-
this.#ackAndDoMoreWork(message.messageId);
245+
await this.#ackAndDoMoreWork(message.messageId);
246246
return;
247247
}
248248

249+
// TODO: For every ACK, decide what should be done with the existing run and attempts. Make sure to check the current statuses first.
250+
249251
switch (messageBody.data.type) {
250252
case "EXECUTE": {
251253
const existingTaskRun = await prisma.taskRun.findUnique({
@@ -260,7 +262,10 @@ export class SharedQueueConsumer {
260262
messageId: message.messageId,
261263
});
262264

263-
this.#ackAndDoMoreWork(message.messageId);
265+
// INFO: There used to be a race condition where tasks could be triggered, but execute messages could be dequeued before the run finished being created in the DB
266+
// This should not be happening anymore. In case it does, consider reqeueuing here with a brief delay while limiting total retries.
267+
268+
await this.#ackAndDoMoreWork(message.messageId);
264269
return;
265270
}
266271

@@ -288,7 +293,7 @@ export class SharedQueueConsumer {
288293
retryingFromCheckpoint,
289294
});
290295

291-
this.#ackAndDoMoreWork(message.messageId);
296+
await this.#ackAndDoMoreWork(message.messageId);
292297
return;
293298
}
294299

@@ -300,7 +305,9 @@ export class SharedQueueConsumer {
300305
messageId: message.messageId,
301306
});
302307

303-
this.#ackAndDoMoreWork(message.messageId);
308+
await this.#markRunAsWaitingForDeploy(existingTaskRun.id);
309+
310+
await this.#ackAndDoMoreWork(message.messageId);
304311
return;
305312
}
306313

@@ -311,7 +318,9 @@ export class SharedQueueConsumer {
311318
deployment: deployment.id,
312319
});
313320

314-
this.#ackAndDoMoreWork(message.messageId);
321+
await this.#markRunAsWaitingForDeploy(existingTaskRun.id);
322+
323+
await this.#ackAndDoMoreWork(message.messageId);
315324
return;
316325
}
317326

@@ -320,15 +329,39 @@ export class SharedQueueConsumer {
320329
);
321330

322331
if (!backgroundTask) {
323-
logger.warn("No matching background task found for task run", {
324-
taskRun: existingTaskRun.id,
325-
taskIdentifier: existingTaskRun.taskIdentifier,
326-
deployment: deployment.id,
327-
backgroundWorker: deployment.worker.id,
328-
taskSlugs: deployment.worker.tasks.map((task) => task.slug),
332+
const nonCurrentTask = await prisma.backgroundWorkerTask.findFirst({
333+
where: {
334+
slug: existingTaskRun.taskIdentifier,
335+
projectId: existingTaskRun.projectId,
336+
runtimeEnvironmentId: existingTaskRun.runtimeEnvironmentId,
337+
},
338+
include: {
339+
worker: {
340+
include: {
341+
deployment: {
342+
include: {},
343+
},
344+
},
345+
},
346+
},
329347
});
330348

331-
this.#ackAndDoMoreWork(message.messageId);
349+
if (nonCurrentTask) {
350+
logger.warn("Task for this run exists but is not part of the current deploy", {
351+
taskRun: existingTaskRun.id,
352+
taskIdentifier: existingTaskRun.taskIdentifier,
353+
});
354+
} else {
355+
logger.warn("Task for this run has never been deployed", {
356+
taskRun: existingTaskRun.id,
357+
taskIdentifier: existingTaskRun.taskIdentifier,
358+
});
359+
}
360+
361+
await this.#markRunAsWaitingForDeploy(existingTaskRun.id);
362+
363+
// If this task is ever deployed, a new message will be enqueued after successful indexing
364+
await this.#ackAndDoMoreWork(message.messageId);
332365
return;
333366
}
334367

@@ -365,7 +398,7 @@ export class SharedQueueConsumer {
365398
messageId: message.messageId,
366399
});
367400

368-
this.#ackAndDoMoreWork(message.messageId);
401+
await this.#ackAndDoMoreWork(message.messageId);
369402
return;
370403
}
371404

@@ -722,9 +755,23 @@ export class SharedQueueConsumer {
722755
this.#doMoreWork(intervalInMs);
723756
}
724757

725-
async #nackAndDoMoreWork(messageId: string, intervalInMs?: number) {
726-
await marqs?.nackMessage(messageId);
727-
this.#doMoreWork(intervalInMs);
758+
async #nackAndDoMoreWork(messageId: string, queueIntervalInMs?: number, nackRetryInMs?: number) {
759+
const retryAt = nackRetryInMs ? Date.now() + nackRetryInMs : undefined;
760+
await marqs?.nackMessage(messageId, retryAt);
761+
this.#doMoreWork(queueIntervalInMs);
762+
}
763+
764+
async #markRunAsWaitingForDeploy(runId: string) {
765+
logger.debug("Marking run as waiting for deploy", { runId });
766+
767+
return await prisma.taskRun.update({
768+
where: {
769+
id: runId,
770+
},
771+
data: {
772+
status: "WAITING_FOR_DEPLOY",
773+
},
774+
});
728775
}
729776
}
730777

apps/webapp/app/v3/services/cancelTaskRun.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { logger } from "~/services/logger.server";
1010

1111
export const CANCELLABLE_STATUSES: Array<TaskRunStatus> = [
1212
"PENDING",
13+
"WAITING_FOR_DEPLOY",
1314
"EXECUTING",
1415
"PAUSED",
1516
"WAITING_TO_RESUME",

apps/webapp/app/v3/services/crashTaskRun.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { ResumeTaskRunDependenciesService } from "./resumeTaskRunDependencies.se
1313

1414
export const CRASHABLE_RUN_STATUSES: Array<TaskRunStatus> = [
1515
"PENDING",
16+
"WAITING_FOR_DEPLOY",
1617
"EXECUTING",
1718
"PAUSED",
1819
"WAITING_TO_RESUME",

apps/webapp/app/v3/services/createDeployedBackgroundWorker.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { CURRENT_DEPLOYMENT_LABEL } from "~/consts";
88
import { projectPubSub } from "./projectPubSub.server";
99
import { marqs } from "~/v3/marqs/index.server";
1010
import { logger } from "~/services/logger.server";
11+
import { ExecuteTasksWaitingForDeployService } from "./executeTasksWaitingForDeploy";
1112

1213
export class CreateDeployedBackgroundWorkerService extends BaseService {
1314
public async call(
@@ -96,6 +97,8 @@ export class CreateDeployedBackgroundWorkerService extends BaseService {
9697
logger.error("Failed to publish WORKER_CREATED event", { err });
9798
}
9899

100+
await ExecuteTasksWaitingForDeployService.enqueue(backgroundWorker.id, this._prisma);
101+
99102
return backgroundWorker;
100103
});
101104
}

0 commit comments

Comments
 (0)