diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts index 5ee2a7dbee..60221d866c 100644 --- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts @@ -363,43 +363,14 @@ async function createWorkerQueue( ) : queue.concurrencyLimit; - let taskQueue = await prisma.taskQueue.findFirst({ - where: { - runtimeEnvironmentId: worker.runtimeEnvironmentId, - name: queueName, - }, - }); - - if (!taskQueue) { - taskQueue = await prisma.taskQueue.create({ - data: { - friendlyId: generateFriendlyId("queue"), - version: "V2", - name: queueName, - orderableName, - concurrencyLimit, - runtimeEnvironmentId: worker.runtimeEnvironmentId, - projectId: worker.projectId, - type: queueType, - workers: { - connect: { - id: worker.id, - }, - }, - }, - }); - } else { - await prisma.taskQueue.update({ - where: { - id: taskQueue.id, - }, - data: { - workers: { connect: { id: worker.id } }, - version: "V2", - orderableName, - }, - }); - } + const taskQueue = await upsertWorkerQueueRecord( + queueName, + concurrencyLimit ?? undefined, + orderableName, + queueType, + worker, + prisma + ); if (typeof concurrencyLimit === "number") { logger.debug("createWorkerQueue: updating concurrency limit", { @@ -426,6 +397,75 @@ async function createWorkerQueue( return taskQueue; } +async function upsertWorkerQueueRecord( + queueName: string, + concurrencyLimit: number | undefined, + orderableName: string, + queueType: TaskQueueType, + worker: BackgroundWorker, + prisma: PrismaClientOrTransaction, + attempt: number = 0 +): Promise { + if (attempt > 3) { + throw new Error("Failed to insert queue record"); + } + + try { + let taskQueue = await prisma.taskQueue.findFirst({ + where: { + runtimeEnvironmentId: worker.runtimeEnvironmentId, + name: queueName, + }, + }); + + if (!taskQueue) { + taskQueue = await prisma.taskQueue.create({ + data: { + friendlyId: generateFriendlyId("queue"), + version: "V2", + name: queueName, + orderableName, + concurrencyLimit, + runtimeEnvironmentId: worker.runtimeEnvironmentId, + projectId: worker.projectId, + type: queueType, + workers: { + connect: { + id: worker.id, + }, + }, + }, + }); + } else { + await prisma.taskQueue.update({ + where: { + id: taskQueue.id, + }, + data: { + workers: { connect: { id: worker.id } }, + version: "V2", + orderableName, + }, + }); + } + + return taskQueue; + } catch (error) { + // If the queue already exists, let's try again + if (error instanceof Prisma.PrismaClientKnownRequestError && error.code === "P2002") { + return await upsertWorkerQueueRecord( + queueName, + concurrencyLimit, + orderableName, + queueType, + worker, + prisma, + attempt + 1 + ); + } + throw error; + } +} //CreateDeclarativeScheduleError with a message export class CreateDeclarativeScheduleError extends Error { constructor(message: string) { diff --git a/references/hello-world/src/trigger/queues.ts b/references/hello-world/src/trigger/queues.ts index 23a72fa605..9cc3074009 100644 --- a/references/hello-world/src/trigger/queues.ts +++ b/references/hello-world/src/trigger/queues.ts @@ -1,4 +1,4 @@ -import { logger, queue, queues, task } from "@trigger.dev/sdk/v3"; +import { batch, logger, queue, queues, task } from "@trigger.dev/sdk/v3"; export const queuesTester = task({ id: "queues-tester", @@ -52,3 +52,186 @@ export const otherQueueTask = task({ logger.log("Other queue task", { payload }); }, }); + +import { setTimeout } from "node:timers/promises"; + +type Payload = { + id: string; + waitSeconds: number; +}; + +export const myQueue = queue({ + name: "shared-queue", + concurrencyLimit: 2, +}); + +// First task type that uses shared queue +export const sharedQueueTask1 = task({ + id: "shared-queue-task-1", + queue: myQueue, + run: async (payload: Payload) => { + const startedAt = Date.now(); + logger.info(`Task1 ${payload.id} started at ${startedAt}`); + + await setTimeout(payload.waitSeconds * 1000); + + const completedAt = Date.now(); + logger.info(`Task1 ${payload.id} completed at ${completedAt}`); + + return { + id: payload.id, + startedAt, + completedAt, + }; + }, +}); + +// Second task type that uses the same queue +export const sharedQueueTask2 = task({ + id: "shared-queue-task-2", + queue: myQueue, + run: async (payload: Payload) => { + const startedAt = Date.now(); + logger.info(`Task2 ${payload.id} started at ${startedAt}`); + + await setTimeout(payload.waitSeconds * 1000); + + const completedAt = Date.now(); + logger.info(`Task2 ${payload.id} completed at ${completedAt}`); + + return { + id: payload.id, + startedAt, + completedAt, + }; + }, +}); + +export const sharedQueueTask3 = task({ + id: "shared-queue-task-3", + queue: myQueue, + run: async (payload: Payload) => { + const startedAt = Date.now(); + logger.info(`Task2 ${payload.id} started at ${startedAt}`); + + await setTimeout(payload.waitSeconds * 1000); + + const completedAt = Date.now(); + logger.info(`Task2 ${payload.id} completed at ${completedAt}`); + + return { + id: payload.id, + startedAt, + completedAt, + }; + }, +}); + +export const sharedQueueTask4 = task({ + id: "shared-queue-task-4", + queue: myQueue, + run: async (payload: Payload) => { + const startedAt = Date.now(); + logger.info(`Task2 ${payload.id} started at ${startedAt}`); + + await setTimeout(payload.waitSeconds * 1000); + + const completedAt = Date.now(); + logger.info(`Task2 ${payload.id} completed at ${completedAt}`); + + return { + id: payload.id, + startedAt, + completedAt, + }; + }, +}); + +export const sharedQueueTask5 = task({ + id: "shared-queue-task-5", + queue: myQueue, + run: async (payload: Payload) => { + const startedAt = Date.now(); + logger.info(`Task2 ${payload.id} started at ${startedAt}`); + + await setTimeout(payload.waitSeconds * 1000); + + const completedAt = Date.now(); + logger.info(`Task2 ${payload.id} completed at ${completedAt}`); + + return { + id: payload.id, + startedAt, + completedAt, + }; + }, +}); + +// Test task that verifies shared queue concurrency +export const sharedQueueTestTask = task({ + id: "shared-queue-test", + retry: { + maxAttempts: 1, + }, + // 4 minutes + maxDuration: 240, + run: async (payload, { ctx }) => { + logger.info("Starting shared queue concurrency test"); + + // Trigger mix of both task types (5 total tasks) + // With concurrencyLimit: 2, we expect only 2 running at once + // regardless of task type + const results = await batch.triggerAndWait([ + { id: sharedQueueTask1.id, payload: { id: "t1-1", waitSeconds: 4 } }, + { id: sharedQueueTask2.id, payload: { id: "t2-1", waitSeconds: 4 } }, + { id: sharedQueueTask1.id, payload: { id: "t1-2", waitSeconds: 4 } }, + { id: sharedQueueTask2.id, payload: { id: "t2-2", waitSeconds: 4 } }, + { id: sharedQueueTask1.id, payload: { id: "t1-3", waitSeconds: 4 } }, + ]); + + // Verify all tasks completed successfully + if (!results.runs.every((r) => r.ok)) { + throw new Error("One or more tasks failed"); + } + + // Get all executions sorted by start time + const executions = results.runs.map((r) => r.output).sort((a, b) => a.startedAt - b.startedAt); + + // For each point in time, count how many tasks were running + let maxConcurrent = 0; + for (let i = 0; i < executions.length; i++) { + const current = executions[i]; + const concurrent = + executions.filter( + (task) => + task.id !== current.id && // not the same task + task.startedAt <= current.startedAt && // started before or at same time + task.completedAt >= current.startedAt // hadn't completed yet + ).length + 1; // +1 for current task + + maxConcurrent = Math.max(maxConcurrent, concurrent); + } + + // Verify we never exceeded the concurrency limit + if (maxConcurrent > 2) { + throw new Error(`Expected maximum of 2 concurrent tasks, but found ${maxConcurrent}`); + } + + // Verify tasks from both types were able to run + const task1Runs = executions.filter((e) => e.id.startsWith("t1-")).length; + const task2Runs = executions.filter((e) => e.id.startsWith("t2-")).length; + + if (task1Runs === 0 || task2Runs === 0) { + throw new Error( + `Expected both task types to run, but got ${task1Runs} task1 runs and ${task2Runs} task2 runs` + ); + } + + return { + executions, + maxConcurrent, + task1Count: task1Runs, + task2Count: task2Runs, + }; + }, +});