From 65b91a97024686247d9b53e267b59d67ed6f5d33 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 4 Feb 2025 13:32:42 +0000 Subject: [PATCH] Reduce contention on batchTaskRun when setting expected count --- .../app/v3/services/batchTriggerV3.server.ts | 151 ++++++++++-------- internal-packages/database/src/transaction.ts | 18 ++- 2 files changed, 105 insertions(+), 64 deletions(-) diff --git a/apps/webapp/app/v3/services/batchTriggerV3.server.ts b/apps/webapp/app/v3/services/batchTriggerV3.server.ts index 72e56908a0..9bcb9eab56 100644 --- a/apps/webapp/app/v3/services/batchTriggerV3.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV3.server.ts @@ -760,10 +760,21 @@ export class BatchTriggerV3Service extends BaseService { })); let workingIndex = currentIndex; + let expectedCount = 0; for (const item of itemsToProcess) { try { - await this.#processBatchTaskRunItem(batch, environment, item, workingIndex, options); + const created = await this.#processBatchTaskRunItem( + batch, + environment, + item, + workingIndex, + options + ); + + if (created) { + expectedCount++; + } workingIndex++; } catch (error) { @@ -780,6 +791,17 @@ export class BatchTriggerV3Service extends BaseService { } } + if (expectedCount > 0) { + await this._prisma.batchTaskRun.update({ + where: { id: batch.id }, + data: { + expectedCount: { + increment: expectedCount, + }, + }, + }); + } + return { workingIndex }; } @@ -825,21 +847,15 @@ export class BatchTriggerV3Service extends BaseService { if (!result.isCached) { try { - await $transaction(this._prisma, async (tx) => { - // [batchTaskRunId, taskRunId] is a unique index - await tx.batchTaskRunItem.create({ - data: { - batchTaskRunId: batch.id, - taskRunId: result.run.id, - status: batchTaskRunItemStatusForRunStatus(result.run.status), - }, - }); - - await tx.batchTaskRun.update({ - where: { id: batch.id }, - data: { expectedCount: { increment: 1 } }, - }); + await this._prisma.batchTaskRunItem.create({ + data: { + batchTaskRunId: batch.id, + taskRunId: result.run.id, + status: batchTaskRunItemStatusForRunStatus(result.run.status), + }, }); + + return true; } catch (error) { if (isUniqueConstraintError(error, ["batchTaskRunId", "taskRunId"])) { // This means there is already a batchTaskRunItem for this batch and taskRun @@ -852,12 +868,14 @@ export class BatchTriggerV3Service extends BaseService { } ); - return; + return false; } throw error; } } + + return false; } async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) { @@ -907,62 +925,69 @@ export async function completeBatchTaskRunItemV3( scheduleResumeOnComplete = false, taskRunAttemptId?: string ) { - await $transaction(tx, "completeBatchTaskRunItemV3", async (tx, span) => { - span?.setAttribute("batch_id", batchTaskRunId); - - // Update the item to complete - const updated = await tx.batchTaskRunItem.updateMany({ - where: { - id: itemId, - status: "PENDING", - }, - data: { - status: "COMPLETED", - taskRunAttemptId, - }, - }); - - if (updated.count === 0) { - return; - } - - const updatedBatchRun = await tx.batchTaskRun.update({ - where: { - id: batchTaskRunId, - }, - data: { - completedCount: { - increment: 1, + await $transaction( + tx, + "completeBatchTaskRunItemV3", + async (tx, span) => { + span?.setAttribute("batch_id", batchTaskRunId); + + // Update the item to complete + const updated = await tx.batchTaskRunItem.updateMany({ + where: { + id: itemId, + status: "PENDING", }, - }, - select: { - sealed: true, - status: true, - completedCount: true, - expectedCount: true, - dependentTaskAttemptId: true, - }, - }); + data: { + status: "COMPLETED", + taskRunAttemptId, + }, + }); + + if (updated.count === 0) { + return; + } - if ( - updatedBatchRun.status === "PENDING" && - updatedBatchRun.completedCount === updatedBatchRun.expectedCount && - updatedBatchRun.sealed - ) { - await tx.batchTaskRun.update({ + const updatedBatchRun = await tx.batchTaskRun.update({ where: { id: batchTaskRunId, }, data: { - status: "COMPLETED", - completedAt: new Date(), + completedCount: { + increment: 1, + }, + }, + select: { + sealed: true, + status: true, + completedCount: true, + expectedCount: true, + dependentTaskAttemptId: true, }, }); - // We only need to resume the batch if it has a dependent task attempt ID - if (scheduleResumeOnComplete && updatedBatchRun.dependentTaskAttemptId) { - await ResumeBatchRunService.enqueue(batchTaskRunId, true, tx); + if ( + updatedBatchRun.status === "PENDING" && + updatedBatchRun.completedCount === updatedBatchRun.expectedCount && + updatedBatchRun.sealed + ) { + await tx.batchTaskRun.update({ + where: { + id: batchTaskRunId, + }, + data: { + status: "COMPLETED", + completedAt: new Date(), + }, + }); + + // We only need to resume the batch if it has a dependent task attempt ID + if (scheduleResumeOnComplete && updatedBatchRun.dependentTaskAttemptId) { + await ResumeBatchRunService.enqueue(batchTaskRunId, true, tx); + } } + }, + { + timeout: 10000, } - }); + ); } diff --git a/internal-packages/database/src/transaction.ts b/internal-packages/database/src/transaction.ts index 81eaa12bc3..e4cc2aa4df 100644 --- a/internal-packages/database/src/transaction.ts +++ b/internal-packages/database/src/transaction.ts @@ -30,13 +30,21 @@ export type PrismaTransactionOptions = { isolationLevel?: Prisma.TransactionIsolationLevel; swallowPrismaErrors?: boolean; + + /** + * The maximum number of times the transaction will be retried in case of a serialization failure. The default value is 0. + * + * See https://www.prisma.io/docs/orm/prisma-client/queries/transactions#transaction-timing-issues + */ + maxRetries?: number; }; export async function $transaction( prisma: PrismaClientOrTransaction, fn: (prisma: PrismaTransactionClient) => Promise, prismaError: (error: Prisma.PrismaClientKnownRequestError) => void, - options?: PrismaTransactionOptions + options?: PrismaTransactionOptions, + attempt = 0 ): Promise { if (isTransactionClient(prisma)) { return fn(prisma); @@ -46,6 +54,14 @@ export async function $transaction( return await (prisma as PrismaClient).$transaction(fn, options); } catch (error) { if (isPrismaKnownError(error)) { + if ( + error.code === "P2034" && + typeof options?.maxRetries === "number" && + attempt < options.maxRetries + ) { + return $transaction(prisma, fn, prismaError, options, attempt + 1); + } + prismaError(error); if (options?.swallowPrismaErrors) {