From ca17fe90b6f7dcde8fc21ca410441a3ea6c0b63f Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 7 Feb 2025 10:47:58 +0000 Subject: [PATCH] Use idempotency key index when finding existing runs by idempotency key in batch --- .../app/v3/services/batchTriggerV3.server.ts | 48 ++++++++++++------- references/v3-catalog/src/trigger/batch.ts | 20 ++++++-- 2 files changed, 48 insertions(+), 20 deletions(-) diff --git a/apps/webapp/app/v3/services/batchTriggerV3.server.ts b/apps/webapp/app/v3/services/batchTriggerV3.server.ts index 4d6039c912..fdbc8d9c64 100644 --- a/apps/webapp/app/v3/services/batchTriggerV3.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV3.server.ts @@ -329,24 +329,40 @@ export class BatchTriggerV3Service extends BaseService { })); } - const idempotencyKeys = body.items.map((i) => i.options?.idempotencyKey).filter(Boolean); + // Group items by taskIdentifier + const itemsByTask = body.items.reduce((acc, item) => { + if (!item.options?.idempotencyKey) return acc; - const cachedRuns = - idempotencyKeys.length > 0 - ? await this._prisma.taskRun.findMany({ - where: { - runtimeEnvironmentId: environment.id, - idempotencyKey: { - in: body.items.map((i) => i.options?.idempotencyKey).filter(Boolean), - }, - }, - select: { - friendlyId: true, - idempotencyKey: true, - idempotencyKeyExpiresAt: true, + if (!acc[item.task]) { + acc[item.task] = []; + } + acc[item.task].push(item); + return acc; + }, {} as Record); + + logger.debug("[BatchTriggerV2][call] Grouped items by task identifier", { + itemsByTask, + }); + + // Fetch cached runs for each task identifier separately to make use of the index + const cachedRuns = await Promise.all( + Object.entries(itemsByTask).map(([taskIdentifier, items]) => + this._prisma.taskRun.findMany({ + where: { + runtimeEnvironmentId: environment.id, + taskIdentifier, + idempotencyKey: { + in: items.map((i) => i.options?.idempotencyKey).filter(Boolean), }, - }) - : []; + }, + select: { + friendlyId: true, + idempotencyKey: true, + idempotencyKeyExpiresAt: true, + }, + }) + ) + ).then((results) => results.flat()); // Now we need to create an array of all the run IDs, in order // If we have a cached run, that isn't expired, we should use that run ID diff --git a/references/v3-catalog/src/trigger/batch.ts b/references/v3-catalog/src/trigger/batch.ts index dd564d971a..032c6ddaee 100644 --- a/references/v3-catalog/src/trigger/batch.ts +++ b/references/v3-catalog/src/trigger/batch.ts @@ -126,12 +126,24 @@ export const allV2TestTask = task({ retry: { maxAttempts: 1, }, - run: async ({ triggerSequentially }: { triggerSequentially?: boolean }) => { + run: async ({ triggerSequentially }: { triggerSequentially?: boolean }, { ctx }) => { const response1 = await batch.trigger( [ - { id: "all-v2-test-child-1", payload: { child1: "foo" } }, - { id: "all-v2-test-child-2", payload: { child2: "bar" } }, - { id: "all-v2-test-child-1", payload: { child1: "baz" } }, + { + id: "all-v2-test-child-1", + payload: { child1: "foo" }, + options: { idempotencyKey: randomUUID() }, + }, + { + id: "all-v2-test-child-2", + payload: { child2: "bar" }, + options: { idempotencyKey: randomUUID() }, + }, + { + id: "all-v2-test-child-1", + payload: { child1: "baz" }, + options: { idempotencyKey: randomUUID() }, + }, ], { triggerSequentially,