Skip to content

Commit 813d73d

Browse files
authored
Fix issue with heavy contention on TaskQueue updating concurrency limit (#1653)
* Fix issue with heavy contention on TaskQueue updating concurrency limit * Remove concurrency limit when creating background worker tasks
1 parent 9ad396b commit 813d73d

File tree

6 files changed

+118
-94
lines changed

6 files changed

+118
-94
lines changed

.changeset/funny-emus-pay.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/core": patch
3+
---
4+
5+
Allow setting concurrencyLimit to null to signal removing the concurrency limit on the queue

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -180,29 +180,29 @@ export async function createBackgroundTasks(
180180
),
181181
0
182182
)
183-
: null;
183+
: task.queue?.concurrencyLimit;
184184

185-
const taskQueue = await prisma.taskQueue.upsert({
185+
let taskQueue = await prisma.taskQueue.findFirst({
186186
where: {
187-
runtimeEnvironmentId_name: {
188-
runtimeEnvironmentId: worker.runtimeEnvironmentId,
189-
name: queueName,
190-
},
191-
},
192-
update: {
193-
concurrencyLimit,
194-
},
195-
create: {
196-
friendlyId: generateFriendlyId("queue"),
197-
name: queueName,
198-
concurrencyLimit,
199187
runtimeEnvironmentId: worker.runtimeEnvironmentId,
200-
projectId: worker.projectId,
201-
type: task.queue?.name ? "NAMED" : "VIRTUAL",
188+
name: queueName,
202189
},
203190
});
204191

205-
if (typeof taskQueue.concurrencyLimit === "number") {
192+
if (!taskQueue) {
193+
taskQueue = await prisma.taskQueue.create({
194+
data: {
195+
friendlyId: generateFriendlyId("queue"),
196+
name: queueName,
197+
concurrencyLimit,
198+
runtimeEnvironmentId: worker.runtimeEnvironmentId,
199+
projectId: worker.projectId,
200+
type: task.queue?.name ? "NAMED" : "VIRTUAL",
201+
},
202+
});
203+
}
204+
205+
if (typeof concurrencyLimit === "number") {
206206
logger.debug("CreateBackgroundWorkerService: updating concurrency limit", {
207207
workerId: worker.id,
208208
taskQueue,
@@ -212,11 +212,7 @@ export async function createBackgroundTasks(
212212
concurrencyLimit,
213213
taskidentifier: task.id,
214214
});
215-
await marqs?.updateQueueConcurrencyLimits(
216-
environment,
217-
taskQueue.name,
218-
taskQueue.concurrencyLimit
219-
);
215+
await marqs?.updateQueueConcurrencyLimits(environment, taskQueue.name, concurrencyLimit);
220216
} else {
221217
logger.debug("CreateBackgroundWorkerService: removing concurrency limit", {
222218
workerId: worker.id,
@@ -227,6 +223,7 @@ export async function createBackgroundTasks(
227223
concurrencyLimit,
228224
taskidentifier: task.id,
229225
});
226+
230227
await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name);
231228
}
232229
} catch (error) {

apps/webapp/app/v3/services/replayTaskRun.server.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ export class ReplayTaskRunService extends BaseService {
9696
queue: taskQueue
9797
? {
9898
name: taskQueue.name,
99-
concurrencyLimit: taskQueue.concurrencyLimit ?? undefined,
10099
}
101100
: undefined,
102101
concurrencyKey: existingTaskRun.concurrencyKey ?? undefined,

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 44 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ export class TriggerTaskService extends BaseService {
450450
),
451451
0
452452
)
453-
: null;
453+
: body.options.queue?.concurrencyLimit;
454454

455455
let taskQueue = await tx.taskQueue.findFirst({
456456
where: {
@@ -459,74 +459,47 @@ export class TriggerTaskService extends BaseService {
459459
},
460460
});
461461

462-
const existingConcurrencyLimit =
463-
typeof taskQueue?.concurrencyLimit === "number"
464-
? taskQueue.concurrencyLimit
465-
: undefined;
466-
467-
if (taskQueue) {
468-
if (existingConcurrencyLimit !== concurrencyLimit) {
469-
taskQueue = await tx.taskQueue.update({
470-
where: {
471-
id: taskQueue.id,
472-
},
473-
data: {
474-
concurrencyLimit:
475-
typeof concurrencyLimit === "number" ? concurrencyLimit : null,
476-
},
477-
});
478-
479-
if (typeof taskQueue.concurrencyLimit === "number") {
480-
logger.debug("TriggerTaskService: updating concurrency limit", {
481-
runId: taskRun.id,
482-
friendlyId: taskRun.friendlyId,
483-
taskQueue,
484-
orgId: environment.organizationId,
485-
projectId: environment.projectId,
486-
existingConcurrencyLimit,
487-
concurrencyLimit,
488-
queueOptions: body.options?.queue,
489-
});
490-
await marqs?.updateQueueConcurrencyLimits(
491-
environment,
492-
taskQueue.name,
493-
taskQueue.concurrencyLimit
494-
);
495-
} else {
496-
logger.debug("TriggerTaskService: removing concurrency limit", {
497-
runId: taskRun.id,
498-
friendlyId: taskRun.friendlyId,
499-
taskQueue,
500-
orgId: environment.organizationId,
501-
projectId: environment.projectId,
502-
existingConcurrencyLimit,
503-
concurrencyLimit,
504-
queueOptions: body.options?.queue,
505-
});
506-
await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name);
507-
}
508-
}
509-
} else {
510-
const queueId = generateFriendlyId("queue");
511-
462+
if (!taskQueue) {
463+
// handle conflicts with existing queues
512464
taskQueue = await tx.taskQueue.create({
513465
data: {
514-
friendlyId: queueId,
466+
friendlyId: generateFriendlyId("queue"),
515467
name: queueName,
516468
concurrencyLimit,
517469
runtimeEnvironmentId: environment.id,
518470
projectId: environment.projectId,
519471
type: "NAMED",
520472
},
521473
});
474+
}
522475

523-
if (typeof taskQueue.concurrencyLimit === "number") {
524-
await marqs?.updateQueueConcurrencyLimits(
525-
environment,
526-
taskQueue.name,
527-
taskQueue.concurrencyLimit
528-
);
529-
}
476+
if (typeof concurrencyLimit === "number") {
477+
logger.debug("TriggerTaskService: updating concurrency limit", {
478+
runId: taskRun.id,
479+
friendlyId: taskRun.friendlyId,
480+
taskQueue,
481+
orgId: environment.organizationId,
482+
projectId: environment.projectId,
483+
concurrencyLimit,
484+
queueOptions: body.options?.queue,
485+
});
486+
487+
await marqs?.updateQueueConcurrencyLimits(
488+
environment,
489+
taskQueue.name,
490+
concurrencyLimit
491+
);
492+
} else if (concurrencyLimit === null) {
493+
logger.debug("TriggerTaskService: removing concurrency limit", {
494+
runId: taskRun.id,
495+
friendlyId: taskRun.friendlyId,
496+
taskQueue,
497+
orgId: environment.organizationId,
498+
projectId: environment.projectId,
499+
queueOptions: body.options?.queue,
500+
});
501+
502+
await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name);
530503
}
531504
}
532505

@@ -623,6 +596,18 @@ export class TriggerTaskService extends BaseService {
623596
throw new ServiceValidationError(
624597
`Cannot trigger ${taskId} with a one-time use token as it has already been used.`
625598
);
599+
} else if (
600+
Array.isArray(target) &&
601+
target.length == 2 &&
602+
typeof target[0] === "string" &&
603+
typeof target[1] === "string" &&
604+
target[0] == "runtimeEnvironmentId" &&
605+
target[1] == "name" &&
606+
error.message.includes("prisma.taskQueue.create")
607+
) {
608+
throw new Error(
609+
`Failed to trigger ${taskId} as the queue could not be created do to a unique constraint error, please try again.`
610+
);
626611
} else {
627612
throw new ServiceValidationError(
628613
`Cannot trigger ${taskId} as it has already been triggered with the same idempotency key.`

packages/core/src/v3/schemas/schemas.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ export const QueueOptions = z.object({
134134
name: z.string().optional(),
135135
/** An optional property that specifies the maximum number of concurrent run executions.
136136
*
137-
* If this property is omitted, the task can potentially use up the full concurrency of an environment. */
138-
concurrencyLimit: z.number().int().min(0).max(1000).optional(),
137+
* If this property is omitted, the task can potentially use up the full concurrency of an environment */
138+
concurrencyLimit: z.number().int().min(0).max(1000).optional().nullable(),
139139
});
140140

141141
export type QueueOptions = z.infer<typeof QueueOptions>;

references/v3-catalog/src/trigger/queues.ts

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,53 @@ export const queuesController = task({
1111
length?: number;
1212
waitSeconds?: number;
1313
}) => {
14-
await queuesTest.batchTriggerAndWait(
15-
Array.from({ length }, (_, i) => ({
16-
payload: { waitSeconds },
17-
options: {
14+
await Promise.all([
15+
queuesTest.trigger(
16+
{ waitSeconds },
17+
{
1818
queue: {
19-
name: `queue-${i % numberOfQueues}`,
19+
name: "controller-3",
20+
concurrencyLimit: 9,
2021
},
21-
},
22-
}))
23-
);
22+
}
23+
),
24+
queuesTest.trigger(
25+
{ waitSeconds },
26+
{
27+
queue: {
28+
name: "controller-3",
29+
concurrencyLimit: 9,
30+
},
31+
}
32+
),
33+
queuesTest.trigger(
34+
{ waitSeconds },
35+
{
36+
queue: {
37+
name: "controller-3",
38+
concurrencyLimit: 9,
39+
},
40+
}
41+
),
42+
queuesTest.trigger(
43+
{ waitSeconds },
44+
{
45+
queue: {
46+
name: "controller-3",
47+
concurrencyLimit: 9,
48+
},
49+
}
50+
),
51+
queuesTest.trigger(
52+
{ waitSeconds },
53+
{
54+
queue: {
55+
name: "controller-3",
56+
concurrencyLimit: 9,
57+
},
58+
}
59+
),
60+
]);
2461
},
2562
});
2663

@@ -34,10 +71,11 @@ export const queuesTest = task({
3471
export const namedQueueTask = task({
3572
id: "queues/named-queue",
3673
queue: {
37-
name: "named-queue",
74+
name: "controller",
75+
concurrencyLimit: 9,
3876
},
3977
run: async () => {
40-
logger.info("named-queue");
78+
logger.info("named-queue 2");
4179
},
4280
});
4381

0 commit comments

Comments
 (0)