Skip to content

Commit b67ea9a

Browse files
authored
improve batch completion system for run engine v1 (#1656)
* Automatically retry TriggerTaskService when hitting a unique constraint error on idempotency key * improve batch completion system for run engine v1 * Rename batch stuff to v3 so it's not confusing * Handle unique constraint error on BatchTaskRunItem creation and allow different limits for batchTrigger and batchTriggerAndWait
1 parent de31220 commit b67ea9a

File tree

24 files changed

+846
-368
lines changed

24 files changed

+846
-368
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,7 @@ const EnvironmentSchema = z.object({
362362
MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional(),
363363
MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(),
364364
MAX_BATCH_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
365+
MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
365366

366367
REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1"),
367368
BATCH_METADATA_OPERATIONS_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),

apps/webapp/app/presenters/v3/BatchListPresenter.server.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ export class BatchListPresenter extends BasePresenter {
100100
status: BatchTaskRunStatus;
101101
createdAt: Date;
102102
updatedAt: Date;
103+
completedAt: Date | null;
103104
runCount: BigInt;
104105
batchVersion: string;
105106
}[]
@@ -111,6 +112,7 @@ export class BatchListPresenter extends BasePresenter {
111112
b.status,
112113
b."createdAt",
113114
b."updatedAt",
115+
b."completedAt",
114116
b."runCount",
115117
b."batchVersion"
116118
FROM
@@ -196,7 +198,11 @@ WHERE
196198
createdAt: batch.createdAt.toISOString(),
197199
updatedAt: batch.updatedAt.toISOString(),
198200
hasFinished,
199-
finishedAt: hasFinished ? batch.updatedAt.toISOString() : undefined,
201+
finishedAt: batch.completedAt
202+
? batch.completedAt.toISOString()
203+
: hasFinished
204+
? batch.updatedAt.toISOString()
205+
: undefined,
200206
status: batch.status,
201207
environment: displayableEnvironment(environment, userId),
202208
runCount: Number(batch.runCount),

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ const { action, loader } = createActionApiRoute(
7474

7575
const idempotencyKeyExpiresAt = resolveIdempotencyKeyTTL(idempotencyKeyTTL);
7676

77-
const run = await service.call(params.taskId, authentication.environment, body, {
77+
const result = await service.call(params.taskId, authentication.environment, body, {
7878
idempotencyKey: idempotencyKey ?? undefined,
7979
idempotencyKeyExpiresAt: idempotencyKeyExpiresAt,
8080
triggerVersion: triggerVersion ?? undefined,
@@ -83,19 +83,20 @@ const { action, loader } = createActionApiRoute(
8383
oneTimeUseToken,
8484
});
8585

86-
if (!run) {
86+
if (!result) {
8787
return json({ error: "Task not found" }, { status: 404 });
8888
}
8989

9090
const $responseHeaders = await responseHeaders(
91-
run,
91+
result.run,
9292
authentication.environment,
9393
triggerClient
9494
);
9595

9696
return json(
9797
{
98-
id: run.friendlyId,
98+
id: result.run.friendlyId,
99+
isCached: result.isCached,
99100
},
100101
{
101102
headers: $responseHeaders,

apps/webapp/app/routes/api.v1.tasks.batch.ts

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
1212
import { ServiceValidationError } from "~/v3/services/baseService.server";
1313
import {
1414
BatchProcessingStrategy,
15-
BatchTriggerV2Service,
16-
} from "~/v3/services/batchTriggerV2.server";
15+
BatchTriggerV3Service,
16+
} from "~/v3/services/batchTriggerV3.server";
1717
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
1818
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
1919

@@ -40,13 +40,24 @@ const { action, loader } = createActionApiRoute(
4040
}
4141

4242
// Check the there are fewer than MAX_BATCH_V2_TRIGGER_ITEMS items
43-
if (body.items.length > env.MAX_BATCH_V2_TRIGGER_ITEMS) {
44-
return json(
45-
{
46-
error: `Batch size of ${body.items.length} is too large. Maximum allowed batch size is ${env.MAX_BATCH_V2_TRIGGER_ITEMS}.`,
47-
},
48-
{ status: 400 }
49-
);
43+
if (body.dependentAttempt) {
44+
if (body.items.length > env.MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS) {
45+
return json(
46+
{
47+
error: `Batch size of ${body.items.length} is too large. Maximum allowed batch size is ${env.MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS} when batchTriggerAndWait.`,
48+
},
49+
{ status: 400 }
50+
);
51+
}
52+
} else {
53+
if (body.items.length > env.MAX_BATCH_V2_TRIGGER_ITEMS) {
54+
return json(
55+
{
56+
error: `Batch size of ${body.items.length} is too large. Maximum allowed batch size is ${env.MAX_BATCH_V2_TRIGGER_ITEMS}.`,
57+
},
58+
{ status: 400 }
59+
);
60+
}
5061
}
5162

5263
const {
@@ -85,7 +96,7 @@ const { action, loader } = createActionApiRoute(
8596
resolveIdempotencyKeyTTL(idempotencyKeyTTL) ??
8697
new Date(Date.now() + 24 * 60 * 60 * 1000 * 30);
8798

88-
const service = new BatchTriggerV2Service(batchProcessingStrategy ?? undefined);
99+
const service = new BatchTriggerV3Service(batchProcessingStrategy ?? undefined);
89100

90101
try {
91102
const batch = await service.call(authentication.environment, body, {
@@ -118,7 +129,7 @@ const { action, loader } = createActionApiRoute(
118129
return json({ error: error.message }, { status: 422 });
119130
} else if (error instanceof Error) {
120131
return json(
121-
{ error: error.message },
132+
{ error: "Something went wrong" },
122133
{ status: 500, headers: { "x-should-retry": "false" } }
123134
);
124135
}

apps/webapp/app/services/worker.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ import {
5555
CancelDevSessionRunsServiceOptions,
5656
} from "~/v3/services/cancelDevSessionRuns.server";
5757
import { logger } from "./logger.server";
58-
import { BatchProcessingOptions, BatchTriggerV2Service } from "~/v3/services/batchTriggerV2.server";
58+
import { BatchProcessingOptions, BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server";
5959

6060
const workerCatalog = {
6161
indexEndpoint: z.object({
@@ -733,7 +733,7 @@ function getWorkerQueue() {
733733
priority: 0,
734734
maxAttempts: 5,
735735
handler: async (payload, job) => {
736-
const service = new BatchTriggerV2Service(payload.strategy);
736+
const service = new BatchTriggerV3Service(payload.strategy);
737737

738738
await service.processBatchTaskRun(payload);
739739
},

apps/webapp/app/v3/services/batchTriggerTask.server.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ export class BatchTriggerTaskService extends BaseService {
104104

105105
for (const item of body.items) {
106106
try {
107-
const run = await triggerTaskService.call(
107+
const result = await triggerTaskService.call(
108108
taskId,
109109
environment,
110110
{
@@ -123,16 +123,16 @@ export class BatchTriggerTaskService extends BaseService {
123123
}
124124
);
125125

126-
if (run) {
126+
if (result) {
127127
await this._prisma.batchTaskRunItem.create({
128128
data: {
129129
batchTaskRunId: batch.id,
130-
taskRunId: run.id,
131-
status: batchTaskRunItemStatusForRunStatus(run.status),
130+
taskRunId: result.run.id,
131+
status: batchTaskRunItemStatusForRunStatus(result.run.status),
132132
},
133133
});
134134

135-
runs.push(run.friendlyId);
135+
runs.push(result.run.friendlyId);
136136
}
137137

138138
index++;

0 commit comments

Comments
 (0)