Skip to content

Commit 2b3ea69

Browse files
authored
v4: dequeue performance improvements (split concurrency from dequeue) (#2127)
* WIP * Run queue now works with the worker queue / master queue split * Acking should also cause the master queue to be processed * Convert run engine tests and run engine to use runQueue changes * Include the util files in the test tsconfig * coordinator target should be es2020 as well * providers target 2020 * Fix the triggerTask tests in the webapp * v4 now working with the new worker queues, and added the legacy master queue migration stuff * report worker queue lengths via opentelemetry metrics * Adding lock metrics * Release concurrency bucket metrics * • Updated RunQueue.removeEnvironmentQueuesFromMasterQueue() method signature to take runtimeEnvironmentId instead of masterQueue parameter • Added automatic master queue shard calculation using this.keys.masterQueueKeyForEnvironment(runtimeEnvironmentId, this.shardCount) • Updated RunEngine wrapper method to use new runtimeEnvironmentId parameter • Updated DeleteProjectService to call the method once per environment instead of once per master queue • Simplified API by encapsulating master queue sharding logic within RunQueue class * metrics now working, configure the run queue settings, additional metrics for run engine and redis-worker * Fix CodeRabbit suggestions * return undefined from dequeueFromWorkerQueue, not null * Remove message from worker queue in certain circumstances when acking * Update log * Ensure master queue consumers cannot stop from a processing error, and make the consumer interval configurable via an env var * Change how the run queue master queue consumers are disabled internally * Fixed tests * process the queue on nack * Fix more tests * Fix priority tests * Fixed dequeueing test
1 parent f603725 commit 2b3ea69

File tree

72 files changed

+4328
-2824
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+4328
-2824
lines changed

.vscode/launch.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@
146146
"type": "node-terminal",
147147
"request": "launch",
148148
"name": "Debug RunQueue tests",
149-
"command": "pnpm run test ./src/run-queue/index.test.ts",
149+
"command": "pnpm run test ./src/run-queue/index.test.ts --run",
150150
"cwd": "${workspaceFolder}/internal-packages/run-engine",
151151
"sourceMaps": true
152152
},

apps/coordinator/tsconfig.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"compilerOptions": {
3-
"target": "es2018",
3+
"target": "es2020",
44
"module": "commonjs",
55
"esModuleInterop": true,
66
"resolveJsonModule": true,

apps/docker-provider/tsconfig.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"compilerOptions": {
3-
"target": "es2018",
3+
"target": "es2020",
44
"module": "commonjs",
55
"esModuleInterop": true,
66
"forceConsistentCasingInFileNames": true,

apps/kubernetes-provider/tsconfig.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"compilerOptions": {
3-
"target": "es2018",
3+
"target": "es2020",
44
"module": "commonjs",
55
"esModuleInterop": true,
66
"forceConsistentCasingInFileNames": true,

apps/webapp/app/env.server.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,10 @@ const EnvironmentSchema = z.object({
324324
INTERNAL_OTEL_TRACE_DISABLED: z.string().default("0"),
325325

326326
INTERNAL_OTEL_LOG_EXPORTER_URL: z.string().optional(),
327+
INTERNAL_OTEL_METRIC_EXPORTER_URL: z.string().optional(),
328+
INTERNAL_OTEL_METRIC_EXPORTER_AUTH_HEADERS: z.string().optional(),
329+
INTERNAL_OTEL_METRIC_EXPORTER_ENABLED: z.string().default("0"),
330+
INTERNAL_OTEL_METRIC_EXPORTER_INTERVAL_MS: z.coerce.number().int().default(30_000),
327331

328332
ORG_SLACK_INTEGRATION_CLIENT_ID: z.string().optional(),
329333
ORG_SLACK_INTEGRATION_CLIENT_SECRET: z.string().optional(),
@@ -460,8 +464,12 @@ const EnvironmentSchema = z.object({
460464
RUN_ENGINE_QUEUE_AGE_RANDOMIZATION_BIAS: z.coerce.number().default(0.25),
461465
RUN_ENGINE_REUSE_SNAPSHOT_COUNT: z.coerce.number().int().default(0),
462466
RUN_ENGINE_MAXIMUM_ENV_COUNT: z.coerce.number().int().optional(),
467+
RUN_ENGINE_RUN_QUEUE_SHARD_COUNT: z.coerce.number().int().default(4),
463468
RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
464469
RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS: z.coerce.number().int().default(30_000),
470+
RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200),
471+
RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
472+
RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(500),
465473

466474
RUN_ENGINE_WORKER_REDIS_HOST: z
467475
.string()
@@ -617,6 +625,7 @@ const EnvironmentSchema = z.object({
617625
RUN_ENGINE_RELEASE_CONCURRENCY_BATCH_SIZE: z.coerce.number().int().default(10),
618626

619627
RUN_ENGINE_WORKER_ENABLED: z.string().default("1"),
628+
RUN_ENGINE_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
620629

621630
/** How long should the presence ttl last */
622631
DEV_PRESENCE_SSE_TIMEOUT: z.coerce.number().int().default(30_000),

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,7 @@ export class SpanPresenter extends BasePresenter {
145145
},
146146
},
147147
engine: true,
148-
masterQueue: true,
149-
secondaryMasterQueue: true,
148+
workerQueue: true,
150149
error: true,
151150
output: true,
152151
outputType: true,
@@ -364,8 +363,7 @@ export class SpanPresenter extends BasePresenter {
364363
maxDurationInSeconds: getMaxDuration(run.maxDurationInSeconds),
365364
batch: run.batch ? { friendlyId: run.batch.friendlyId } : undefined,
366365
engine: run.engine,
367-
masterQueue: run.masterQueue,
368-
secondaryMasterQueue: run.secondaryMasterQueue,
366+
workerQueue: run.workerQueue,
369367
spanId: run.spanId,
370368
isCached: !!span.originalRun,
371369
};
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import { prisma } from "~/db.server";
3+
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
4+
import { engine } from "~/v3/runEngine.server";
5+
6+
export async function action({ request }: ActionFunctionArgs) {
7+
// Next authenticate the request
8+
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);
9+
10+
if (!authenticationResult) {
11+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
12+
}
13+
14+
const user = await prisma.user.findUnique({
15+
where: {
16+
id: authenticationResult.userId,
17+
},
18+
});
19+
20+
if (!user) {
21+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
22+
}
23+
24+
if (!user.admin) {
25+
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
26+
}
27+
28+
try {
29+
await engine.migrateLegacyMasterQueues();
30+
31+
return json({
32+
success: true,
33+
});
34+
} catch (error) {
35+
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
36+
}
37+
}
Lines changed: 7 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,92 +1,22 @@
11
import { json } from "@remix-run/server-runtime";
2-
import { DequeuedMessage, DevDequeueRequestBody, MachineResources } from "@trigger.dev/core/v3";
3-
import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic";
4-
import { env } from "~/env.server";
2+
import { DevDequeueRequestBody } from "@trigger.dev/core/v3";
53
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
64
import { engine } from "~/v3/runEngine.server";
75

86
const { action } = createActionApiRoute(
97
{
10-
body: DevDequeueRequestBody,
8+
body: DevDequeueRequestBody, // Even though we don't use it, we need to keep it for backwards compatibility
119
maxContentLength: 1024 * 10, // 10KB
1210
method: "POST",
1311
},
14-
async ({ authentication, body }) => {
15-
//we won't return more runs than this in one API call
16-
let maxDequeueCount = env.DEV_DEQUEUE_MAX_RUNS_PER_PULL;
17-
18-
//we can't use more than the max resources
19-
const availableResources = body.maxResources ?? {
20-
cpu: 8,
21-
memory: 16,
22-
};
23-
24-
let dequeuedMessages: DequeuedMessage[] = [];
25-
26-
//we need to check the current worker, because a run might have been locked to it
27-
const workers = body.oldWorkers.concat(body.currentWorker);
28-
29-
//first we want to clear out old runs
30-
for (const worker of workers) {
31-
//dequeue
32-
const latestResult = await engine.dequeueFromBackgroundWorkerMasterQueue({
33-
consumerId: authentication.environment.id,
34-
//specific version
35-
backgroundWorkerId: BackgroundWorkerId.toId(worker),
36-
maxRunCount: maxDequeueCount,
37-
maxResources: availableResources,
38-
});
39-
40-
//add runs to the array
41-
dequeuedMessages.push(...latestResult);
42-
43-
//update availableResources
44-
const consumedResources = latestResult.reduce(
45-
(acc, r) => {
46-
return {
47-
cpu: acc.cpu + r.run.machine.cpu,
48-
memory: acc.memory + r.run.machine.memory,
49-
};
50-
},
51-
{ cpu: 0, memory: 0 }
52-
);
53-
updateAvailableResources(availableResources, consumedResources);
54-
55-
//update maxDequeueCount
56-
maxDequeueCount -= latestResult.length;
57-
58-
//if we have no resources left, we exit the loop
59-
if (!hasAvailableResources(availableResources)) break;
60-
//we've already dequeued the max number of runs
61-
if (maxDequeueCount <= 0) break;
62-
}
63-
64-
//dequeue from the current version if we still have space
65-
if (hasAvailableResources(availableResources) && maxDequeueCount > 0) {
66-
const latestResult = await engine.dequeueFromEnvironmentMasterQueue({
67-
consumerId: authentication.environment.id,
68-
//current dev version (no specific version specified)
69-
environmentId: authentication.environment.id,
70-
maxRunCount: maxDequeueCount,
71-
maxResources: availableResources,
72-
});
73-
dequeuedMessages.push(...latestResult);
74-
}
12+
async ({ authentication }) => {
13+
const dequeuedMessages = await engine.dequeueFromEnvironmentWorkerQueue({
14+
consumerId: authentication.environment.id,
15+
environmentId: authentication.environment.id,
16+
});
7517

7618
return json({ dequeuedMessages }, { status: 200 });
7719
}
7820
);
7921

80-
function updateAvailableResources(
81-
availableResources: MachineResources,
82-
resources: MachineResources
83-
) {
84-
availableResources.cpu -= resources.cpu;
85-
availableResources.memory -= resources.memory;
86-
}
87-
88-
function hasAvailableResources(availableResources: MachineResources) {
89-
return availableResources.cpu > 0 && availableResources.memory > 0;
90-
}
91-
9222
export { action };
Lines changed: 3 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
import { json, TypedResponse } from "@remix-run/server-runtime";
2-
import { CURRENT_DEPLOYMENT_LABEL } from "@trigger.dev/core/v3/isomorphic";
32
import { WorkerApiDequeueResponseBody } from "@trigger.dev/core/v3/workers";
43
import { z } from "zod";
5-
import { $replica, prisma } from "~/db.server";
64
import { createLoaderWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.server";
75

6+
// Keep this route for backwards compatibility
87
export const loader = createLoaderWorkerApiRoute(
98
{
109
params: z.object({
@@ -14,55 +13,7 @@ export const loader = createLoaderWorkerApiRoute(
1413
maxRunCount: z.coerce.number().optional(),
1514
}),
1615
},
17-
async ({
18-
authenticatedWorker,
19-
params,
20-
searchParams,
21-
}): Promise<TypedResponse<WorkerApiDequeueResponseBody>> => {
22-
const deployment = await $replica.workerDeployment.findUnique({
23-
where: {
24-
friendlyId: params.deploymentFriendlyId,
25-
},
26-
include: {
27-
worker: true,
28-
},
29-
});
30-
31-
if (!deployment) {
32-
throw new Error("Deployment not found");
33-
}
34-
35-
if (!deployment.worker) {
36-
throw new Error("Worker not found");
37-
}
38-
39-
const dequeuedMessages = (await isCurrentDeployment(deployment.id, deployment.environmentId))
40-
? await authenticatedWorker.dequeueFromEnvironment(
41-
deployment.worker.id,
42-
deployment.environmentId
43-
)
44-
: await authenticatedWorker.dequeueFromVersion(
45-
deployment.worker.id,
46-
searchParams.maxRunCount
47-
);
48-
49-
return json(dequeuedMessages);
16+
async (): Promise<TypedResponse<WorkerApiDequeueResponseBody>> => {
17+
return json([]);
5018
}
5119
);
52-
53-
async function isCurrentDeployment(deploymentId: string, environmentId: string): Promise<boolean> {
54-
const promotion = await prisma.workerDeploymentPromotion.findUnique({
55-
where: {
56-
environmentId_label: {
57-
environmentId,
58-
label: CURRENT_DEPLOYMENT_LABEL,
59-
},
60-
},
61-
});
62-
63-
if (!promotion) {
64-
return false;
65-
}
66-
67-
return promotion.deploymentId === deploymentId;
68-
}

apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,9 @@ import { createActionWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.
77

88
export const action = createActionWorkerApiRoute(
99
{
10-
body: WorkerApiDequeueRequestBody,
10+
body: WorkerApiDequeueRequestBody, // Even though we don't use it, we need to keep it for backwards compatibility
1111
},
12-
async ({ authenticatedWorker, body }): Promise<TypedResponse<WorkerApiDequeueResponseBody>> => {
13-
return json(
14-
await authenticatedWorker.dequeue({
15-
maxResources: body.maxResources,
16-
maxRunCount: body.maxRunCount,
17-
})
18-
);
12+
async ({ authenticatedWorker }): Promise<TypedResponse<WorkerApiDequeueResponseBody>> => {
13+
return json(await authenticatedWorker.dequeue());
1914
}
2015
);

0 commit comments

Comments
 (0)