diff --git a/apps/webapp/app/db.server.ts b/apps/webapp/app/db.server.ts index 445d7fbf49..969f3f0276 100644 --- a/apps/webapp/app/db.server.ts +++ b/apps/webapp/app/db.server.ts @@ -111,6 +111,7 @@ function getClient() { const databaseUrl = extendQueryParams(DATABASE_URL, { connection_limit: env.DATABASE_CONNECTION_LIMIT.toString(), pool_timeout: env.DATABASE_POOL_TIMEOUT.toString(), + connection_timeout: env.DATABASE_CONNECTION_TIMEOUT.toString(), }); console.log(`🔌 setting up prisma client to ${redactUrlSecrets(databaseUrl)}`); @@ -162,6 +163,7 @@ function getReplicaClient() { const replicaUrl = extendQueryParams(env.DATABASE_READ_REPLICA_URL, { connection_limit: env.DATABASE_CONNECTION_LIMIT.toString(), pool_timeout: env.DATABASE_POOL_TIMEOUT.toString(), + connection_timeout: env.DATABASE_CONNECTION_TIMEOUT.toString(), }); console.log(`🔌 setting up read replica connection to ${redactUrlSecrets(replicaUrl)}`); diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index c41b4ea19f..4facbfffb2 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -13,6 +13,7 @@ const EnvironmentSchema = z.object({ ), DATABASE_CONNECTION_LIMIT: z.coerce.number().int().default(10), DATABASE_POOL_TIMEOUT: z.coerce.number().int().default(60), + DATABASE_CONNECTION_TIMEOUT: z.coerce.number().int().default(20), DIRECT_URL: z .string() .refine( diff --git a/apps/webapp/app/v3/legacyRunEngineWorker.server.ts b/apps/webapp/app/v3/legacyRunEngineWorker.server.ts index 52a978fffc..a5ca5c8483 100644 --- a/apps/webapp/app/v3/legacyRunEngineWorker.server.ts +++ b/apps/webapp/app/v3/legacyRunEngineWorker.server.ts @@ -5,6 +5,8 @@ import { env } from "~/env.server"; import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; import { TaskRunHeartbeatFailedService } from "./taskRunHeartbeatFailed.server"; +import { completeBatchTaskRunItemV3 } from "./services/batchTriggerV3.server"; +import { prisma } from "~/db.server"; function initializeWorker() { const redisOptions = { @@ -34,6 +36,19 @@ function initializeWorker() { maxAttempts: 3, }, }, + completeBatchTaskRunItem: { + schema: z.object({ + itemId: z.string(), + batchTaskRunId: z.string(), + scheduleResumeOnComplete: z.boolean(), + taskRunAttemptId: z.string().optional(), + attempt: z.number().optional(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 10, + }, + }, }, concurrency: { workers: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS, @@ -49,6 +64,16 @@ function initializeWorker() { await service.call(payload.runId); }, + completeBatchTaskRunItem: async ({ payload, attempt }) => { + await completeBatchTaskRunItemV3( + payload.itemId, + payload.batchTaskRunId, + prisma, + payload.scheduleResumeOnComplete, + payload.taskRunAttemptId, + attempt + ); + }, }, }); diff --git a/apps/webapp/app/v3/services/batchTriggerV3.server.ts b/apps/webapp/app/v3/services/batchTriggerV3.server.ts index 9bcb9eab56..83bc67b22b 100644 --- a/apps/webapp/app/v3/services/batchTriggerV3.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV3.server.ts @@ -7,6 +7,8 @@ import { } from "@trigger.dev/core/v3"; import { BatchTaskRun, + isPrismaRaceConditionError, + isPrismaRetriableError, isUniqueConstraintError, Prisma, TaskRunAttempt, @@ -20,6 +22,7 @@ import { logger } from "~/services/logger.server"; import { getEntitlement } from "~/services/platform.v3.server"; import { workerQueue } from "~/services/worker.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; +import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server"; import { marqs } from "../marqs/index.server"; import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server"; import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../r2.server"; @@ -923,71 +926,123 @@ export async function completeBatchTaskRunItemV3( batchTaskRunId: string, tx: PrismaClientOrTransaction, scheduleResumeOnComplete = false, - taskRunAttemptId?: string + taskRunAttemptId?: string, + retryAttempt?: number ) { - await $transaction( - tx, - "completeBatchTaskRunItemV3", - async (tx, span) => { - span?.setAttribute("batch_id", batchTaskRunId); - - // Update the item to complete - const updated = await tx.batchTaskRunItem.updateMany({ - where: { - id: itemId, - status: "PENDING", - }, - data: { - status: "COMPLETED", - taskRunAttemptId, - }, - }); + const isRetry = retryAttempt !== undefined; + + if (isRetry) { + logger.debug("completeBatchTaskRunItemV3 retrying", { + itemId, + batchTaskRunId, + scheduleResumeOnComplete, + taskRunAttemptId, + retryAttempt, + }); + } - if (updated.count === 0) { - return; - } + try { + await $transaction( + tx, + "completeBatchTaskRunItemV3", + async (tx, span) => { + span?.setAttribute("batch_id", batchTaskRunId); - const updatedBatchRun = await tx.batchTaskRun.update({ - where: { - id: batchTaskRunId, - }, - data: { - completedCount: { - increment: 1, + // Update the item to complete + const updated = await tx.batchTaskRunItem.updateMany({ + where: { + id: itemId, + status: "PENDING", }, - }, - select: { - sealed: true, - status: true, - completedCount: true, - expectedCount: true, - dependentTaskAttemptId: true, - }, - }); + data: { + status: "COMPLETED", + taskRunAttemptId, + }, + }); - if ( - updatedBatchRun.status === "PENDING" && - updatedBatchRun.completedCount === updatedBatchRun.expectedCount && - updatedBatchRun.sealed - ) { - await tx.batchTaskRun.update({ + if (updated.count === 0) { + return; + } + + const updatedBatchRun = await tx.batchTaskRun.update({ where: { id: batchTaskRunId, }, data: { - status: "COMPLETED", - completedAt: new Date(), + completedCount: { + increment: 1, + }, + }, + select: { + sealed: true, + status: true, + completedCount: true, + expectedCount: true, + dependentTaskAttemptId: true, }, }); - // We only need to resume the batch if it has a dependent task attempt ID - if (scheduleResumeOnComplete && updatedBatchRun.dependentTaskAttemptId) { - await ResumeBatchRunService.enqueue(batchTaskRunId, true, tx); + if ( + updatedBatchRun.status === "PENDING" && + updatedBatchRun.completedCount === updatedBatchRun.expectedCount && + updatedBatchRun.sealed + ) { + await tx.batchTaskRun.update({ + where: { + id: batchTaskRunId, + }, + data: { + status: "COMPLETED", + completedAt: new Date(), + }, + }); + + // We only need to resume the batch if it has a dependent task attempt ID + if (scheduleResumeOnComplete && updatedBatchRun.dependentTaskAttemptId) { + await ResumeBatchRunService.enqueue(batchTaskRunId, true, tx); + } } + }, + { + timeout: 10_000, + maxWait: 4_000, } - }, - { - timeout: 10000, + ); + } catch (error) { + if (isPrismaRetriableError(error) || isPrismaRaceConditionError(error)) { + logger.error("completeBatchTaskRunItemV3 failed with a Prisma Error, scheduling a retry", { + itemId, + batchTaskRunId, + error, + retryAttempt, + isRetry, + }); + + if (isRetry) { + //throwing this error will cause the Redis worker to retry the job + throw error; + } else { + //schedule a retry + await legacyRunEngineWorker.enqueue({ + id: `completeBatchTaskRunItem:${itemId}`, + job: "completeBatchTaskRunItem", + payload: { + itemId, + batchTaskRunId, + scheduleResumeOnComplete, + taskRunAttemptId, + }, + availableAt: new Date(Date.now() + 2_000), + }); + } + } else { + logger.error("completeBatchTaskRunItemV3 failed with a non-retriable error", { + itemId, + batchTaskRunId, + error, + retryAttempt, + isRetry, + }); } - ); + } } diff --git a/internal-packages/database/src/transaction.ts b/internal-packages/database/src/transaction.ts index e4cc2aa4df..beb9e26751 100644 --- a/internal-packages/database/src/transaction.ts +++ b/internal-packages/database/src/transaction.ts @@ -13,12 +13,38 @@ function isTransactionClient(prisma: PrismaClientOrTransaction): prisma is Prism return !("$transaction" in prisma); } -function isPrismaKnownError(error: unknown): error is Prisma.PrismaClientKnownRequestError { +export function isPrismaKnownError(error: unknown): error is Prisma.PrismaClientKnownRequestError { return ( typeof error === "object" && error !== null && "code" in error && typeof error.code === "string" ); } +/* +• P2024: Connection timeout errors +• P2028: Transaction timeout errors +• P2034: Transaction deadlock/conflict errors +*/ +const retryCodes = ["P2024", "P2028", "P2034"]; + +export function isPrismaRetriableError(error: unknown): boolean { + if (!isPrismaKnownError(error)) { + return false; + } + + return retryCodes.includes(error.code); +} + +/* +• P2025: Record not found errors (in race conditions) [not included for now] +*/ +export function isPrismaRaceConditionError(error: unknown): boolean { + if (!isPrismaKnownError(error)) { + return false; + } + + return error.code === "P2025"; +} + export type PrismaTransactionOptions = { /** The maximum amount of time (in ms) Prisma Client will wait to acquire a transaction from the database. The default value is 2000ms. */ maxWait?: number; @@ -55,7 +81,7 @@ export async function $transaction( } catch (error) { if (isPrismaKnownError(error)) { if ( - error.code === "P2034" && + retryCodes.includes(error.code) && typeof options?.maxRetries === "number" && attempt < options.maxRetries ) { diff --git a/references/hello-world/src/trigger/example.ts b/references/hello-world/src/trigger/example.ts index 50d76069b0..3a19fd0946 100644 --- a/references/hello-world/src/trigger/example.ts +++ b/references/hello-world/src/trigger/example.ts @@ -68,3 +68,21 @@ export const maxDurationParentTask = task({ return result; }, }); + +export const batchTask = task({ + id: "batch", + run: async (payload: { count: number }, { ctx }) => { + logger.info("Starting batch task", { count: payload.count }); + + const items = Array.from({ length: payload.count }, (_, i) => ({ + payload: { message: `Batch item ${i + 1}` }, + })); + + const results = await childTask.batchTriggerAndWait(items); + + return { + batchCount: payload.count, + results, + }; + }, +});