Skip to content

Commit b586762

Browse files
authored
Use idempotency key index when finding existing runs by idempotency key in batch (#1677)
1 parent 0b555fa commit b586762

File tree

2 files changed

+48
-20
lines changed

2 files changed

+48
-20
lines changed

apps/webapp/app/v3/services/batchTriggerV3.server.ts

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -329,24 +329,40 @@ export class BatchTriggerV3Service extends BaseService {
329329
}));
330330
}
331331

332-
const idempotencyKeys = body.items.map((i) => i.options?.idempotencyKey).filter(Boolean);
332+
// Group items by taskIdentifier
333+
const itemsByTask = body.items.reduce((acc, item) => {
334+
if (!item.options?.idempotencyKey) return acc;
333335

334-
const cachedRuns =
335-
idempotencyKeys.length > 0
336-
? await this._prisma.taskRun.findMany({
337-
where: {
338-
runtimeEnvironmentId: environment.id,
339-
idempotencyKey: {
340-
in: body.items.map((i) => i.options?.idempotencyKey).filter(Boolean),
341-
},
342-
},
343-
select: {
344-
friendlyId: true,
345-
idempotencyKey: true,
346-
idempotencyKeyExpiresAt: true,
336+
if (!acc[item.task]) {
337+
acc[item.task] = [];
338+
}
339+
acc[item.task].push(item);
340+
return acc;
341+
}, {} as Record<string, typeof body.items>);
342+
343+
logger.debug("[BatchTriggerV2][call] Grouped items by task identifier", {
344+
itemsByTask,
345+
});
346+
347+
// Fetch cached runs for each task identifier separately to make use of the index
348+
const cachedRuns = await Promise.all(
349+
Object.entries(itemsByTask).map(([taskIdentifier, items]) =>
350+
this._prisma.taskRun.findMany({
351+
where: {
352+
runtimeEnvironmentId: environment.id,
353+
taskIdentifier,
354+
idempotencyKey: {
355+
in: items.map((i) => i.options?.idempotencyKey).filter(Boolean),
347356
},
348-
})
349-
: [];
357+
},
358+
select: {
359+
friendlyId: true,
360+
idempotencyKey: true,
361+
idempotencyKeyExpiresAt: true,
362+
},
363+
})
364+
)
365+
).then((results) => results.flat());
350366

351367
// Now we need to create an array of all the run IDs, in order
352368
// If we have a cached run, that isn't expired, we should use that run ID

references/v3-catalog/src/trigger/batch.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,24 @@ export const allV2TestTask = task({
126126
retry: {
127127
maxAttempts: 1,
128128
},
129-
run: async ({ triggerSequentially }: { triggerSequentially?: boolean }) => {
129+
run: async ({ triggerSequentially }: { triggerSequentially?: boolean }, { ctx }) => {
130130
const response1 = await batch.trigger<typeof allV2ChildTask1 | typeof allV2ChildTask2>(
131131
[
132-
{ id: "all-v2-test-child-1", payload: { child1: "foo" } },
133-
{ id: "all-v2-test-child-2", payload: { child2: "bar" } },
134-
{ id: "all-v2-test-child-1", payload: { child1: "baz" } },
132+
{
133+
id: "all-v2-test-child-1",
134+
payload: { child1: "foo" },
135+
options: { idempotencyKey: randomUUID() },
136+
},
137+
{
138+
id: "all-v2-test-child-2",
139+
payload: { child2: "bar" },
140+
options: { idempotencyKey: randomUUID() },
141+
},
142+
{
143+
id: "all-v2-test-child-1",
144+
payload: { child1: "baz" },
145+
options: { idempotencyKey: randomUUID() },
146+
},
135147
],
136148
{
137149
triggerSequentially,

0 commit comments

Comments
 (0)