Skip to content

Commit c49af77

Browse files
authored
Retry batch item completion (#1675)
* Added isPrismaRetriableError() * Retry completeBatchTaskRunItem if they fail because of a retriable Prisma error * Retry using Redis worker * Handle more retriable errors. Add special condition in for race condition error * Added Postgres connection_timeout with default 20s * Added a simple batchTriggerAndWait example
1 parent f00ed9b commit c49af77

File tree

6 files changed

+181
-54
lines changed

6 files changed

+181
-54
lines changed

apps/webapp/app/db.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ function getClient() {
111111
const databaseUrl = extendQueryParams(DATABASE_URL, {
112112
connection_limit: env.DATABASE_CONNECTION_LIMIT.toString(),
113113
pool_timeout: env.DATABASE_POOL_TIMEOUT.toString(),
114+
connection_timeout: env.DATABASE_CONNECTION_TIMEOUT.toString(),
114115
});
115116

116117
console.log(`🔌 setting up prisma client to ${redactUrlSecrets(databaseUrl)}`);
@@ -162,6 +163,7 @@ function getReplicaClient() {
162163
const replicaUrl = extendQueryParams(env.DATABASE_READ_REPLICA_URL, {
163164
connection_limit: env.DATABASE_CONNECTION_LIMIT.toString(),
164165
pool_timeout: env.DATABASE_POOL_TIMEOUT.toString(),
166+
connection_timeout: env.DATABASE_CONNECTION_TIMEOUT.toString(),
165167
});
166168

167169
console.log(`🔌 setting up read replica connection to ${redactUrlSecrets(replicaUrl)}`);

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const EnvironmentSchema = z.object({
1313
),
1414
DATABASE_CONNECTION_LIMIT: z.coerce.number().int().default(10),
1515
DATABASE_POOL_TIMEOUT: z.coerce.number().int().default(60),
16+
DATABASE_CONNECTION_TIMEOUT: z.coerce.number().int().default(20),
1617
DIRECT_URL: z
1718
.string()
1819
.refine(

apps/webapp/app/v3/legacyRunEngineWorker.server.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import { env } from "~/env.server";
55
import { logger } from "~/services/logger.server";
66
import { singleton } from "~/utils/singleton";
77
import { TaskRunHeartbeatFailedService } from "./taskRunHeartbeatFailed.server";
8+
import { completeBatchTaskRunItemV3 } from "./services/batchTriggerV3.server";
9+
import { prisma } from "~/db.server";
810

911
function initializeWorker() {
1012
const redisOptions = {
@@ -34,6 +36,19 @@ function initializeWorker() {
3436
maxAttempts: 3,
3537
},
3638
},
39+
completeBatchTaskRunItem: {
40+
schema: z.object({
41+
itemId: z.string(),
42+
batchTaskRunId: z.string(),
43+
scheduleResumeOnComplete: z.boolean(),
44+
taskRunAttemptId: z.string().optional(),
45+
attempt: z.number().optional(),
46+
}),
47+
visibilityTimeoutMs: 60_000,
48+
retry: {
49+
maxAttempts: 10,
50+
},
51+
},
3752
},
3853
concurrency: {
3954
workers: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS,
@@ -49,6 +64,16 @@ function initializeWorker() {
4964

5065
await service.call(payload.runId);
5166
},
67+
completeBatchTaskRunItem: async ({ payload, attempt }) => {
68+
await completeBatchTaskRunItemV3(
69+
payload.itemId,
70+
payload.batchTaskRunId,
71+
prisma,
72+
payload.scheduleResumeOnComplete,
73+
payload.taskRunAttemptId,
74+
attempt
75+
);
76+
},
5277
},
5378
});
5479

apps/webapp/app/v3/services/batchTriggerV3.server.ts

Lines changed: 107 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import {
77
} from "@trigger.dev/core/v3";
88
import {
99
BatchTaskRun,
10+
isPrismaRaceConditionError,
11+
isPrismaRetriableError,
1012
isUniqueConstraintError,
1113
Prisma,
1214
TaskRunAttempt,
@@ -20,6 +22,7 @@ import { logger } from "~/services/logger.server";
2022
import { getEntitlement } from "~/services/platform.v3.server";
2123
import { workerQueue } from "~/services/worker.server";
2224
import { generateFriendlyId } from "../friendlyIdentifiers";
25+
import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server";
2326
import { marqs } from "../marqs/index.server";
2427
import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server";
2528
import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../r2.server";
@@ -923,71 +926,123 @@ export async function completeBatchTaskRunItemV3(
923926
batchTaskRunId: string,
924927
tx: PrismaClientOrTransaction,
925928
scheduleResumeOnComplete = false,
926-
taskRunAttemptId?: string
929+
taskRunAttemptId?: string,
930+
retryAttempt?: number
927931
) {
928-
await $transaction(
929-
tx,
930-
"completeBatchTaskRunItemV3",
931-
async (tx, span) => {
932-
span?.setAttribute("batch_id", batchTaskRunId);
933-
934-
// Update the item to complete
935-
const updated = await tx.batchTaskRunItem.updateMany({
936-
where: {
937-
id: itemId,
938-
status: "PENDING",
939-
},
940-
data: {
941-
status: "COMPLETED",
942-
taskRunAttemptId,
943-
},
944-
});
932+
const isRetry = retryAttempt !== undefined;
933+
934+
if (isRetry) {
935+
logger.debug("completeBatchTaskRunItemV3 retrying", {
936+
itemId,
937+
batchTaskRunId,
938+
scheduleResumeOnComplete,
939+
taskRunAttemptId,
940+
retryAttempt,
941+
});
942+
}
945943

946-
if (updated.count === 0) {
947-
return;
948-
}
944+
try {
945+
await $transaction(
946+
tx,
947+
"completeBatchTaskRunItemV3",
948+
async (tx, span) => {
949+
span?.setAttribute("batch_id", batchTaskRunId);
949950

950-
const updatedBatchRun = await tx.batchTaskRun.update({
951-
where: {
952-
id: batchTaskRunId,
953-
},
954-
data: {
955-
completedCount: {
956-
increment: 1,
951+
// Update the item to complete
952+
const updated = await tx.batchTaskRunItem.updateMany({
953+
where: {
954+
id: itemId,
955+
status: "PENDING",
957956
},
958-
},
959-
select: {
960-
sealed: true,
961-
status: true,
962-
completedCount: true,
963-
expectedCount: true,
964-
dependentTaskAttemptId: true,
965-
},
966-
});
957+
data: {
958+
status: "COMPLETED",
959+
taskRunAttemptId,
960+
},
961+
});
967962

968-
if (
969-
updatedBatchRun.status === "PENDING" &&
970-
updatedBatchRun.completedCount === updatedBatchRun.expectedCount &&
971-
updatedBatchRun.sealed
972-
) {
973-
await tx.batchTaskRun.update({
963+
if (updated.count === 0) {
964+
return;
965+
}
966+
967+
const updatedBatchRun = await tx.batchTaskRun.update({
974968
where: {
975969
id: batchTaskRunId,
976970
},
977971
data: {
978-
status: "COMPLETED",
979-
completedAt: new Date(),
972+
completedCount: {
973+
increment: 1,
974+
},
975+
},
976+
select: {
977+
sealed: true,
978+
status: true,
979+
completedCount: true,
980+
expectedCount: true,
981+
dependentTaskAttemptId: true,
980982
},
981983
});
982984

983-
// We only need to resume the batch if it has a dependent task attempt ID
984-
if (scheduleResumeOnComplete && updatedBatchRun.dependentTaskAttemptId) {
985-
await ResumeBatchRunService.enqueue(batchTaskRunId, true, tx);
985+
if (
986+
updatedBatchRun.status === "PENDING" &&
987+
updatedBatchRun.completedCount === updatedBatchRun.expectedCount &&
988+
updatedBatchRun.sealed
989+
) {
990+
await tx.batchTaskRun.update({
991+
where: {
992+
id: batchTaskRunId,
993+
},
994+
data: {
995+
status: "COMPLETED",
996+
completedAt: new Date(),
997+
},
998+
});
999+
1000+
// We only need to resume the batch if it has a dependent task attempt ID
1001+
if (scheduleResumeOnComplete && updatedBatchRun.dependentTaskAttemptId) {
1002+
await ResumeBatchRunService.enqueue(batchTaskRunId, true, tx);
1003+
}
9861004
}
1005+
},
1006+
{
1007+
timeout: 10_000,
1008+
maxWait: 4_000,
9871009
}
988-
},
989-
{
990-
timeout: 10000,
1010+
);
1011+
} catch (error) {
1012+
if (isPrismaRetriableError(error) || isPrismaRaceConditionError(error)) {
1013+
logger.error("completeBatchTaskRunItemV3 failed with a Prisma Error, scheduling a retry", {
1014+
itemId,
1015+
batchTaskRunId,
1016+
error,
1017+
retryAttempt,
1018+
isRetry,
1019+
});
1020+
1021+
if (isRetry) {
1022+
//throwing this error will cause the Redis worker to retry the job
1023+
throw error;
1024+
} else {
1025+
//schedule a retry
1026+
await legacyRunEngineWorker.enqueue({
1027+
id: `completeBatchTaskRunItem:${itemId}`,
1028+
job: "completeBatchTaskRunItem",
1029+
payload: {
1030+
itemId,
1031+
batchTaskRunId,
1032+
scheduleResumeOnComplete,
1033+
taskRunAttemptId,
1034+
},
1035+
availableAt: new Date(Date.now() + 2_000),
1036+
});
1037+
}
1038+
} else {
1039+
logger.error("completeBatchTaskRunItemV3 failed with a non-retriable error", {
1040+
itemId,
1041+
batchTaskRunId,
1042+
error,
1043+
retryAttempt,
1044+
isRetry,
1045+
});
9911046
}
992-
);
1047+
}
9931048
}

internal-packages/database/src/transaction.ts

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,38 @@ function isTransactionClient(prisma: PrismaClientOrTransaction): prisma is Prism
1313
return !("$transaction" in prisma);
1414
}
1515

16-
function isPrismaKnownError(error: unknown): error is Prisma.PrismaClientKnownRequestError {
16+
export function isPrismaKnownError(error: unknown): error is Prisma.PrismaClientKnownRequestError {
1717
return (
1818
typeof error === "object" && error !== null && "code" in error && typeof error.code === "string"
1919
);
2020
}
2121

22+
/*
23+
• P2024: Connection timeout errors
24+
• P2028: Transaction timeout errors
25+
• P2034: Transaction deadlock/conflict errors
26+
*/
27+
const retryCodes = ["P2024", "P2028", "P2034"];
28+
29+
export function isPrismaRetriableError(error: unknown): boolean {
30+
if (!isPrismaKnownError(error)) {
31+
return false;
32+
}
33+
34+
return retryCodes.includes(error.code);
35+
}
36+
37+
/*
38+
• P2025: Record not found errors (in race conditions) [not included for now]
39+
*/
40+
export function isPrismaRaceConditionError(error: unknown): boolean {
41+
if (!isPrismaKnownError(error)) {
42+
return false;
43+
}
44+
45+
return error.code === "P2025";
46+
}
47+
2248
export type PrismaTransactionOptions = {
2349
/** The maximum amount of time (in ms) Prisma Client will wait to acquire a transaction from the database. The default value is 2000ms. */
2450
maxWait?: number;
@@ -55,7 +81,7 @@ export async function $transaction<R>(
5581
} catch (error) {
5682
if (isPrismaKnownError(error)) {
5783
if (
58-
error.code === "P2034" &&
84+
retryCodes.includes(error.code) &&
5985
typeof options?.maxRetries === "number" &&
6086
attempt < options.maxRetries
6187
) {

references/hello-world/src/trigger/example.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,21 @@ export const maxDurationParentTask = task({
6868
return result;
6969
},
7070
});
71+
72+
export const batchTask = task({
73+
id: "batch",
74+
run: async (payload: { count: number }, { ctx }) => {
75+
logger.info("Starting batch task", { count: payload.count });
76+
77+
const items = Array.from({ length: payload.count }, (_, i) => ({
78+
payload: { message: `Batch item ${i + 1}` },
79+
}));
80+
81+
const results = await childTask.batchTriggerAndWait(items);
82+
83+
return {
84+
batchCount: payload.count,
85+
results,
86+
};
87+
},
88+
});

0 commit comments

Comments
 (0)