Skip to content

Commit f96bf72

Browse files
authored
Engine v1 improvements (#1627)
* Efficiency improvements to SharedQueueConsumer and MarQS for run engine v1 * Reliabily resolve task queue for a run and ack runs where we can't find the queue * engine v1: stop nacking after reaching the max nack count, ack to remove the message * Handle division by 0 possibility in distributeQueues
1 parent c855e4a commit f96bf72

19 files changed

+1081
-820
lines changed

apps/webapp/app/env.server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ const EnvironmentSchema = z.object({
162162
SHARED_QUEUE_CONSUMER_POOL_SIZE: z.coerce.number().int().default(10),
163163
SHARED_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(100),
164164
SHARED_QUEUE_CONSUMER_NEXT_TICK_INTERVAL_MS: z.coerce.number().int().default(100),
165+
SHARED_QUEUE_CONSUMER_EMIT_RESUME_DEPENDENCY_TIMEOUT_MS: z.coerce.number().int().default(1000),
166+
SHARED_QUEUE_CONSUMER_RESOLVE_PAYLOADS_BATCH_SIZE: z.coerce.number().int().default(25),
165167

166168
// Development OTEL environment variables
167169
DEV_OTEL_EXPORTER_OTLP_ENDPOINT: z.string().optional(),
@@ -219,6 +221,10 @@ const EnvironmentSchema = z.object({
219221
.number()
220222
.int()
221223
.default(60 * 1000 * 15),
224+
MARQS_SHARED_QUEUE_SELECTION_COUNT: z.coerce.number().int().default(36),
225+
MARQS_DEV_QUEUE_SELECTION_COUNT: z.coerce.number().int().default(12),
226+
MARQS_MAXIMUM_NACK_COUNT: z.coerce.number().int().default(64),
227+
222228
PROD_TASK_HEARTBEAT_INTERVAL_MS: z.coerce.number().int().optional(),
223229

224230
VERBOSE_GRAPHILE_LOGGING: z.string().default("false"),
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import { QueueOptions } from "@trigger.dev/core/v3/schemas";
2+
import { TaskQueue } from "@trigger.dev/database";
3+
import { prisma } from "~/db.server";
4+
5+
export async function findQueueInEnvironment(
6+
queueName: string,
7+
environmentId: string,
8+
backgroundWorkerTaskId?: string,
9+
backgroundTask?: { queueConfig?: unknown }
10+
): Promise<TaskQueue | undefined> {
11+
const sanitizedQueueName = sanitizeQueueName(queueName);
12+
13+
const queue = await prisma.taskQueue.findFirst({
14+
where: {
15+
runtimeEnvironmentId: environmentId,
16+
name: sanitizedQueueName,
17+
},
18+
});
19+
20+
if (queue) {
21+
return queue;
22+
}
23+
24+
const task = backgroundTask
25+
? backgroundTask
26+
: backgroundWorkerTaskId
27+
? await prisma.backgroundWorkerTask.findFirst({
28+
where: {
29+
id: backgroundWorkerTaskId,
30+
},
31+
})
32+
: undefined;
33+
34+
if (!task) {
35+
return;
36+
}
37+
38+
const queueConfig = QueueOptions.safeParse(task.queueConfig);
39+
40+
if (queueConfig.success) {
41+
const taskQueueName = queueConfig.data.name
42+
? sanitizeQueueName(queueConfig.data.name)
43+
: undefined;
44+
45+
if (taskQueueName && taskQueueName !== sanitizedQueueName) {
46+
const queue = await prisma.taskQueue.findFirst({
47+
where: {
48+
runtimeEnvironmentId: environmentId,
49+
name: taskQueueName,
50+
},
51+
});
52+
53+
if (queue) {
54+
return queue;
55+
}
56+
}
57+
}
58+
}
59+
60+
// Only allow alphanumeric characters, underscores, hyphens, and slashes (and only the first 128 characters)
61+
export function sanitizeQueueName(queueName: string) {
62+
return queueName.replace(/[^a-zA-Z0-9_\-\/]/g, "").substring(0, 128);
63+
}

apps/webapp/app/routes/admin.api.v1.marqs.ts

Lines changed: 0 additions & 31 deletions
This file was deleted.

apps/webapp/app/services/apiAuth.server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
isPersonalAccessToken,
1818
} from "./personalAccessToken.server";
1919
import { isPublicJWT, validatePublicJwtKey } from "./realtime/jwtAuth.server";
20+
import { RuntimeEnvironmentForEnvRepo } from "~/v3/environmentVariables/environmentVariablesRepository.server";
2021

2122
const ClaimsSchema = z.object({
2223
scopes: z.array(z.string()).optional(),
@@ -410,7 +411,7 @@ const JWT_ALGORITHM = "HS256";
410411
const DEFAULT_JWT_EXPIRATION_IN_MS = 1000 * 60 * 60; // 1 hour
411412

412413
export async function generateJWTTokenForEnvironment(
413-
environment: RuntimeEnvironment,
414+
environment: RuntimeEnvironmentForEnvRepo,
414415
payload: Record<string, string>
415416
) {
416417
const jwt = await new SignJWT({

apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -654,9 +654,26 @@ export class EnvironmentVariablesRepository implements Repository {
654654
}
655655
}
656656

657+
export const RuntimeEnvironmentForEnvRepoPayload = {
658+
select: {
659+
id: true,
660+
slug: true,
661+
type: true,
662+
projectId: true,
663+
apiKey: true,
664+
organizationId: true,
665+
},
666+
} as const;
667+
668+
export type RuntimeEnvironmentForEnvRepo = Prisma.RuntimeEnvironmentGetPayload<
669+
typeof RuntimeEnvironmentForEnvRepoPayload
670+
>;
671+
657672
export const environmentVariablesRepository = new EnvironmentVariablesRepository();
658673

659-
export async function resolveVariablesForEnvironment(runtimeEnvironment: RuntimeEnvironment) {
674+
export async function resolveVariablesForEnvironment(
675+
runtimeEnvironment: RuntimeEnvironmentForEnvRepo
676+
) {
660677
const projectSecrets = await environmentVariablesRepository.getEnvironmentVariables(
661678
runtimeEnvironment.projectId,
662679
runtimeEnvironment.id
@@ -672,7 +689,9 @@ export async function resolveVariablesForEnvironment(runtimeEnvironment: Runtime
672689
return [...overridableTriggerVariables, ...projectSecrets, ...builtInVariables];
673690
}
674691

675-
async function resolveOverridableTriggerVariables(runtimeEnvironment: RuntimeEnvironment) {
692+
async function resolveOverridableTriggerVariables(
693+
runtimeEnvironment: RuntimeEnvironmentForEnvRepo
694+
) {
676695
let result: Array<EnvironmentVariable> = [
677696
{
678697
key: "TRIGGER_REALTIME_STREAM_VERSION",
@@ -683,7 +702,7 @@ async function resolveOverridableTriggerVariables(runtimeEnvironment: RuntimeEnv
683702
return result;
684703
}
685704

686-
async function resolveBuiltInDevVariables(runtimeEnvironment: RuntimeEnvironment) {
705+
async function resolveBuiltInDevVariables(runtimeEnvironment: RuntimeEnvironmentForEnvRepo) {
687706
let result: Array<EnvironmentVariable> = [
688707
{
689708
key: "OTEL_EXPORTER_OTLP_ENDPOINT",
@@ -745,7 +764,7 @@ async function resolveBuiltInDevVariables(runtimeEnvironment: RuntimeEnvironment
745764
return [...result, ...commonVariables];
746765
}
747766

748-
async function resolveBuiltInProdVariables(runtimeEnvironment: RuntimeEnvironment) {
767+
async function resolveBuiltInProdVariables(runtimeEnvironment: RuntimeEnvironmentForEnvRepo) {
749768
let result: Array<EnvironmentVariable> = [
750769
{
751770
key: "TRIGGER_SECRET_KEY",
@@ -838,7 +857,7 @@ async function resolveBuiltInProdVariables(runtimeEnvironment: RuntimeEnvironmen
838857
}
839858

840859
async function resolveCommonBuiltInVariables(
841-
runtimeEnvironment: RuntimeEnvironment
860+
runtimeEnvironment: RuntimeEnvironmentForEnvRepo
842861
): Promise<Array<EnvironmentVariable>> {
843862
return [];
844863
}

apps/webapp/app/v3/marqs/devQueueConsumer.server.ts

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,15 @@ import { prisma } from "~/db.server";
1313
import { createNewSession, disconnectSession } from "~/models/runtimeEnvironment.server";
1414
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1515
import { logger } from "~/services/logger.server";
16-
import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server";
16+
import { marqs } from "~/v3/marqs/index.server";
1717
import { resolveVariablesForEnvironment } from "../environmentVariables/environmentVariablesRepository.server";
1818
import { FailedTaskRunService } from "../failedTaskRun.server";
1919
import { CancelDevSessionRunsService } from "../services/cancelDevSessionRuns.server";
2020
import { CompleteAttemptService } from "../services/completeAttempt.server";
21-
import {
22-
SEMINTATTRS_FORCE_RECORDING,
23-
attributesFromAuthenticatedEnv,
24-
tracer,
25-
} from "../tracer.server";
26-
import { DevSubscriber, devPubSub } from "./devPubSub.server";
21+
import { attributesFromAuthenticatedEnv, tracer } from "../tracer.server";
2722
import { getMaxDuration } from "../utils/maxDuration";
23+
import { DevSubscriber, devPubSub } from "./devPubSub.server";
24+
import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server";
2825

2926
const MessageBody = z.discriminatedUnion("type", [
3027
z.object({
@@ -436,14 +433,12 @@ export class DevQueueConsumer {
436433
return;
437434
}
438435

439-
const queue = await prisma.taskQueue.findUnique({
440-
where: {
441-
runtimeEnvironmentId_name: {
442-
runtimeEnvironmentId: this.env.id,
443-
name: sanitizeQueueName(lockedTaskRun.queue),
444-
},
445-
},
446-
});
436+
const queue = await findQueueInEnvironment(
437+
lockedTaskRun.queue,
438+
this.env.id,
439+
backgroundTask.id,
440+
backgroundTask
441+
);
447442

448443
if (!queue) {
449444
logger.debug("[DevQueueConsumer] Failed to find queue", {

0 commit comments

Comments
 (0)