From 9d0bf587475cdc3491b65e4d3aa898f87de8df36 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 30 Jan 2025 11:13:27 +0000 Subject: [PATCH 1/2] Fix issue with heavy contention on TaskQueue updating concurrency limit --- .changeset/funny-emus-pay.md | 5 + .../services/createBackgroundWorker.server.ts | 43 ++++---- .../app/v3/services/replayTaskRun.server.ts | 1 - .../app/v3/services/triggerTask.server.ts | 103 ++++++++---------- packages/core/src/v3/schemas/schemas.ts | 4 +- references/v3-catalog/src/trigger/queues.ts | 58 ++++++++-- 6 files changed, 119 insertions(+), 95 deletions(-) create mode 100644 .changeset/funny-emus-pay.md diff --git a/.changeset/funny-emus-pay.md b/.changeset/funny-emus-pay.md new file mode 100644 index 0000000000..f5b182c354 --- /dev/null +++ b/.changeset/funny-emus-pay.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/core": patch +--- + +Allow setting concurrencyLimit to null to signal removing the concurrency limit on the queue diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts index 65b8cd3cc8..c6d0d9e492 100644 --- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts @@ -180,29 +180,29 @@ export async function createBackgroundTasks( ), 0 ) - : null; + : task.queue?.concurrencyLimit; - const taskQueue = await prisma.taskQueue.upsert({ + let taskQueue = await prisma.taskQueue.findFirst({ where: { - runtimeEnvironmentId_name: { - runtimeEnvironmentId: worker.runtimeEnvironmentId, - name: queueName, - }, - }, - update: { - concurrencyLimit, - }, - create: { - friendlyId: generateFriendlyId("queue"), - name: queueName, - concurrencyLimit, runtimeEnvironmentId: worker.runtimeEnvironmentId, - projectId: worker.projectId, - type: task.queue?.name ? "NAMED" : "VIRTUAL", + name: queueName, }, }); - if (typeof taskQueue.concurrencyLimit === "number") { + if (!taskQueue) { + taskQueue = await prisma.taskQueue.create({ + data: { + friendlyId: generateFriendlyId("queue"), + name: queueName, + concurrencyLimit, + runtimeEnvironmentId: worker.runtimeEnvironmentId, + projectId: worker.projectId, + type: task.queue?.name ? "NAMED" : "VIRTUAL", + }, + }); + } + + if (typeof concurrencyLimit === "number") { logger.debug("CreateBackgroundWorkerService: updating concurrency limit", { workerId: worker.id, taskQueue, @@ -212,12 +212,8 @@ export async function createBackgroundTasks( concurrencyLimit, taskidentifier: task.id, }); - await marqs?.updateQueueConcurrencyLimits( - environment, - taskQueue.name, - taskQueue.concurrencyLimit - ); - } else { + await marqs?.updateQueueConcurrencyLimits(environment, taskQueue.name, concurrencyLimit); + } else if (concurrencyLimit === null) { logger.debug("CreateBackgroundWorkerService: removing concurrency limit", { workerId: worker.id, taskQueue, @@ -227,6 +223,7 @@ export async function createBackgroundTasks( concurrencyLimit, taskidentifier: task.id, }); + await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name); } } catch (error) { diff --git a/apps/webapp/app/v3/services/replayTaskRun.server.ts b/apps/webapp/app/v3/services/replayTaskRun.server.ts index 601bb8a075..cb97c07995 100644 --- a/apps/webapp/app/v3/services/replayTaskRun.server.ts +++ b/apps/webapp/app/v3/services/replayTaskRun.server.ts @@ -96,7 +96,6 @@ export class ReplayTaskRunService extends BaseService { queue: taskQueue ? { name: taskQueue.name, - concurrencyLimit: taskQueue.concurrencyLimit ?? undefined, } : undefined, concurrencyKey: existingTaskRun.concurrencyKey ?? undefined, diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index 98c4e21005..5c0008169a 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -450,7 +450,7 @@ export class TriggerTaskService extends BaseService { ), 0 ) - : null; + : body.options.queue?.concurrencyLimit; let taskQueue = await tx.taskQueue.findFirst({ where: { @@ -459,59 +459,11 @@ export class TriggerTaskService extends BaseService { }, }); - const existingConcurrencyLimit = - typeof taskQueue?.concurrencyLimit === "number" - ? taskQueue.concurrencyLimit - : undefined; - - if (taskQueue) { - if (existingConcurrencyLimit !== concurrencyLimit) { - taskQueue = await tx.taskQueue.update({ - where: { - id: taskQueue.id, - }, - data: { - concurrencyLimit: - typeof concurrencyLimit === "number" ? concurrencyLimit : null, - }, - }); - - if (typeof taskQueue.concurrencyLimit === "number") { - logger.debug("TriggerTaskService: updating concurrency limit", { - runId: taskRun.id, - friendlyId: taskRun.friendlyId, - taskQueue, - orgId: environment.organizationId, - projectId: environment.projectId, - existingConcurrencyLimit, - concurrencyLimit, - queueOptions: body.options?.queue, - }); - await marqs?.updateQueueConcurrencyLimits( - environment, - taskQueue.name, - taskQueue.concurrencyLimit - ); - } else { - logger.debug("TriggerTaskService: removing concurrency limit", { - runId: taskRun.id, - friendlyId: taskRun.friendlyId, - taskQueue, - orgId: environment.organizationId, - projectId: environment.projectId, - existingConcurrencyLimit, - concurrencyLimit, - queueOptions: body.options?.queue, - }); - await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name); - } - } - } else { - const queueId = generateFriendlyId("queue"); - + if (!taskQueue) { + // handle conflicts with existing queues taskQueue = await tx.taskQueue.create({ data: { - friendlyId: queueId, + friendlyId: generateFriendlyId("queue"), name: queueName, concurrencyLimit, runtimeEnvironmentId: environment.id, @@ -519,14 +471,35 @@ export class TriggerTaskService extends BaseService { type: "NAMED", }, }); + } - if (typeof taskQueue.concurrencyLimit === "number") { - await marqs?.updateQueueConcurrencyLimits( - environment, - taskQueue.name, - taskQueue.concurrencyLimit - ); - } + if (typeof concurrencyLimit === "number") { + logger.debug("TriggerTaskService: updating concurrency limit", { + runId: taskRun.id, + friendlyId: taskRun.friendlyId, + taskQueue, + orgId: environment.organizationId, + projectId: environment.projectId, + concurrencyLimit, + queueOptions: body.options?.queue, + }); + + await marqs?.updateQueueConcurrencyLimits( + environment, + taskQueue.name, + concurrencyLimit + ); + } else if (concurrencyLimit === null) { + logger.debug("TriggerTaskService: removing concurrency limit", { + runId: taskRun.id, + friendlyId: taskRun.friendlyId, + taskQueue, + orgId: environment.organizationId, + projectId: environment.projectId, + queueOptions: body.options?.queue, + }); + + await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name); } } @@ -623,6 +596,18 @@ export class TriggerTaskService extends BaseService { throw new ServiceValidationError( `Cannot trigger ${taskId} with a one-time use token as it has already been used.` ); + } else if ( + Array.isArray(target) && + target.length == 2 && + typeof target[0] === "string" && + typeof target[1] === "string" && + target[0] == "runtimeEnvironmentId" && + target[1] == "name" && + error.message.includes("prisma.taskQueue.create") + ) { + throw new Error( + `Failed to trigger ${taskId} as the queue could not be created do to a unique constraint error, please try again.` + ); } else { throw new ServiceValidationError( `Cannot trigger ${taskId} as it has already been triggered with the same idempotency key.` diff --git a/packages/core/src/v3/schemas/schemas.ts b/packages/core/src/v3/schemas/schemas.ts index 42edf8602e..418db11c5a 100644 --- a/packages/core/src/v3/schemas/schemas.ts +++ b/packages/core/src/v3/schemas/schemas.ts @@ -134,8 +134,8 @@ export const QueueOptions = z.object({ name: z.string().optional(), /** An optional property that specifies the maximum number of concurrent run executions. * - * If this property is omitted, the task can potentially use up the full concurrency of an environment. */ - concurrencyLimit: z.number().int().min(0).max(1000).optional(), + * If this property is omitted, the task can potentially use up the full concurrency of an environment */ + concurrencyLimit: z.number().int().min(0).max(1000).optional().nullable(), }); export type QueueOptions = z.infer; diff --git a/references/v3-catalog/src/trigger/queues.ts b/references/v3-catalog/src/trigger/queues.ts index c6b2841ee9..f2f1781322 100644 --- a/references/v3-catalog/src/trigger/queues.ts +++ b/references/v3-catalog/src/trigger/queues.ts @@ -11,16 +11,53 @@ export const queuesController = task({ length?: number; waitSeconds?: number; }) => { - await queuesTest.batchTriggerAndWait( - Array.from({ length }, (_, i) => ({ - payload: { waitSeconds }, - options: { + await Promise.all([ + queuesTest.trigger( + { waitSeconds }, + { queue: { - name: `queue-${i % numberOfQueues}`, + name: "controller-3", + concurrencyLimit: 9, }, - }, - })) - ); + } + ), + queuesTest.trigger( + { waitSeconds }, + { + queue: { + name: "controller-3", + concurrencyLimit: 9, + }, + } + ), + queuesTest.trigger( + { waitSeconds }, + { + queue: { + name: "controller-3", + concurrencyLimit: 9, + }, + } + ), + queuesTest.trigger( + { waitSeconds }, + { + queue: { + name: "controller-3", + concurrencyLimit: 9, + }, + } + ), + queuesTest.trigger( + { waitSeconds }, + { + queue: { + name: "controller-3", + concurrencyLimit: 9, + }, + } + ), + ]); }, }); @@ -34,10 +71,11 @@ export const queuesTest = task({ export const namedQueueTask = task({ id: "queues/named-queue", queue: { - name: "named-queue", + name: "controller", + concurrencyLimit: 9, }, run: async () => { - logger.info("named-queue"); + logger.info("named-queue 2"); }, }); From 96bd40649606bd9e56ac7a23f89139631f6cc3fc Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 30 Jan 2025 11:19:09 +0000 Subject: [PATCH 2/2] Remove concurrency limit when creating background worker tasks --- apps/webapp/app/v3/services/createBackgroundWorker.server.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts index c6d0d9e492..5cd5e5406c 100644 --- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts @@ -213,7 +213,7 @@ export async function createBackgroundTasks( taskidentifier: task.id, }); await marqs?.updateQueueConcurrencyLimits(environment, taskQueue.name, concurrencyLimit); - } else if (concurrencyLimit === null) { + } else { logger.debug("CreateBackgroundWorkerService: removing concurrency limit", { workerId: worker.id, taskQueue,