Skip to content

Commit 14e081e

Browse files
authored
release concurrency system reliability improvements (#2081)
* Refill release concurrency tokens when a run is cancelled * Improved release concurrency accounting system + a sweeper to auto-refill tokens for snapshots that are no longer the latest snapshot on a run (e.g. the run has moved to a new snapshot state) * Fix order of arguments to the releasings sweeper * Add a heartbeat for SUSPENDED snapshots, where when stalled will attempt to continue the run if unblocked
1 parent 500a173 commit 14e081e

15 files changed

+866
-129
lines changed

apps/webapp/app/components/admin/debugRun.tsx

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,15 @@ function DebugRunContent({ friendlyId }: { friendlyId: string }) {
6666
);
6767
}
6868

69-
function DebugRunData({
69+
function DebugRunData(props: UseDataFunctionReturn<typeof loader>) {
70+
if (props.engine === "V1") {
71+
return <DebugRunDataEngineV1 {...props} />;
72+
}
73+
74+
return <DebugRunDataEngineV2 {...props} />;
75+
}
76+
77+
function DebugRunDataEngineV1({
7078
run,
7179
queueConcurrencyLimit,
7280
queueCurrentConcurrency,
@@ -338,3 +346,55 @@ function DebugRunData({
338346
</Property.Table>
339347
);
340348
}
349+
350+
function DebugRunDataEngineV2({
351+
run,
352+
queueConcurrencyLimit,
353+
queueCurrentConcurrency,
354+
envConcurrencyLimit,
355+
envCurrentConcurrency,
356+
keys,
357+
}: UseDataFunctionReturn<typeof loader>) {
358+
return (
359+
<Property.Table>
360+
<Property.Item>
361+
<Property.Label>ID</Property.Label>
362+
<Property.Value className="flex items-center gap-2">
363+
<ClipboardField value={run.id} variant="tertiary/small" iconButton />
364+
</Property.Value>
365+
</Property.Item>
366+
<Property.Item>
367+
<Property.Label>Queue current concurrency</Property.Label>
368+
<Property.Value className="flex items-center gap-2">
369+
<span>{queueCurrentConcurrency ?? "0"}</span>
370+
</Property.Value>
371+
</Property.Item>
372+
<Property.Item>
373+
<Property.Label>Queue concurrency limit</Property.Label>
374+
<Property.Value className="flex items-center gap-2">
375+
<span>{queueConcurrencyLimit ?? "Not set"}</span>
376+
</Property.Value>
377+
</Property.Item>
378+
<Property.Item>
379+
<Property.Label>Env current concurrency</Property.Label>
380+
<Property.Value className="flex items-center gap-2">
381+
<span>{envCurrentConcurrency ?? "0"}</span>
382+
</Property.Value>
383+
</Property.Item>
384+
<Property.Item>
385+
<Property.Label>Env concurrency limit</Property.Label>
386+
<Property.Value className="flex items-center gap-2">
387+
<span>{envConcurrencyLimit ?? "Not set"}</span>
388+
</Property.Value>
389+
</Property.Item>
390+
{keys.map((key) => (
391+
<Property.Item>
392+
<Property.Label>{key.label}</Property.Label>
393+
<Property.Value className="flex items-center gap-2">
394+
<ClipboardField value={key.key} variant="tertiary/small" iconButton />
395+
</Property.Value>
396+
</Property.Item>
397+
))}
398+
</Property.Table>
399+
);
400+
}

apps/webapp/app/env.server.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,10 @@ const EnvironmentSchema = z.object({
452452
RUN_ENGINE_TIMEOUT_PENDING_CANCEL: z.coerce.number().int().default(60_000),
453453
RUN_ENGINE_TIMEOUT_EXECUTING: z.coerce.number().int().default(60_000),
454454
RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS: z.coerce.number().int().default(60_000),
455+
RUN_ENGINE_TIMEOUT_SUSPENDED: z.coerce
456+
.number()
457+
.int()
458+
.default(60_000 * 10),
455459
RUN_ENGINE_DEBUG_WORKER_NOTIFICATIONS: z.coerce.boolean().default(false),
456460
RUN_ENGINE_PARENT_QUEUE_LIMIT: z.coerce.number().int().default(1000),
457461
RUN_ENGINE_CONCURRENCY_LIMIT_BIAS: z.coerce.number().default(0.75),
@@ -605,6 +609,11 @@ const EnvironmentSchema = z.object({
605609
RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED: z.string().default("0"),
606610
RUN_ENGINE_RELEASE_CONCURRENCY_DISABLE_CONSUMERS: z.string().default("0"),
607611
RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO: z.coerce.number().default(1),
612+
RUN_ENGINE_RELEASE_CONCURRENCY_RELEASINGS_MAX_AGE: z.coerce
613+
.number()
614+
.int()
615+
.default(60_000 * 30),
616+
RUN_ENGINE_RELEASE_CONCURRENCY_RELEASINGS_POLL_INTERVAL: z.coerce.number().int().default(60_000),
608617
RUN_ENGINE_RELEASE_CONCURRENCY_MAX_RETRIES: z.coerce.number().int().default(3),
609618
RUN_ENGINE_RELEASE_CONCURRENCY_CONSUMERS_COUNT: z.coerce.number().int().default(1),
610619
RUN_ENGINE_RELEASE_CONCURRENCY_POLL_INTERVAL: z.coerce.number().int().default(500),

apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts

Lines changed: 132 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { z } from "zod";
44
import { $replica } from "~/db.server";
55
import { requireUserId } from "~/services/session.server";
66
import { marqs } from "~/v3/marqs/index.server";
7+
import { engine } from "~/v3/runEngine.server";
78

89
const ParamSchema = z.object({
910
runParam: z.string(),
@@ -17,6 +18,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
1718
where: { friendlyId: runParam, project: { organization: { members: { some: { userId } } } } },
1819
select: {
1920
id: true,
21+
engine: true,
2022
friendlyId: true,
2123
queue: true,
2224
concurrencyKey: true,
@@ -27,6 +29,8 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
2729
type: true,
2830
slug: true,
2931
organizationId: true,
32+
project: true,
33+
maximumConcurrencyLimit: true,
3034
organization: {
3135
select: {
3236
id: true,
@@ -41,33 +45,132 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
4145
throw new Response("Not Found", { status: 404 });
4246
}
4347

44-
const queueConcurrencyLimit = await marqs.getQueueConcurrencyLimit(
45-
run.runtimeEnvironment,
46-
run.queue
47-
);
48-
const envConcurrencyLimit = await marqs.getEnvConcurrencyLimit(run.runtimeEnvironment);
49-
const queueCurrentConcurrency = await marqs.currentConcurrencyOfQueue(
50-
run.runtimeEnvironment,
51-
run.queue,
52-
run.concurrencyKey ?? undefined
53-
);
54-
const envCurrentConcurrency = await marqs.currentConcurrencyOfEnvironment(run.runtimeEnvironment);
55-
56-
const queueReserveConcurrency = await marqs.reserveConcurrencyOfQueue(
57-
run.runtimeEnvironment,
58-
run.queue,
59-
run.concurrencyKey ?? undefined
60-
);
61-
62-
const envReserveConcurrency = await marqs.reserveConcurrencyOfEnvironment(run.runtimeEnvironment);
63-
64-
return typedjson({
65-
run,
66-
queueConcurrencyLimit,
67-
envConcurrencyLimit,
68-
queueCurrentConcurrency,
69-
envCurrentConcurrency,
70-
queueReserveConcurrency,
71-
envReserveConcurrency,
72-
});
48+
if (run.engine === "V1") {
49+
const queueConcurrencyLimit = await marqs.getQueueConcurrencyLimit(
50+
run.runtimeEnvironment,
51+
run.queue
52+
);
53+
const envConcurrencyLimit = await marqs.getEnvConcurrencyLimit(run.runtimeEnvironment);
54+
const queueCurrentConcurrency = await marqs.currentConcurrencyOfQueue(
55+
run.runtimeEnvironment,
56+
run.queue,
57+
run.concurrencyKey ?? undefined
58+
);
59+
const envCurrentConcurrency = await marqs.currentConcurrencyOfEnvironment(
60+
run.runtimeEnvironment
61+
);
62+
63+
const queueReserveConcurrency = await marqs.reserveConcurrencyOfQueue(
64+
run.runtimeEnvironment,
65+
run.queue,
66+
run.concurrencyKey ?? undefined
67+
);
68+
69+
const envReserveConcurrency = await marqs.reserveConcurrencyOfEnvironment(
70+
run.runtimeEnvironment
71+
);
72+
73+
return typedjson({
74+
engine: "V1",
75+
run,
76+
queueConcurrencyLimit,
77+
envConcurrencyLimit,
78+
queueCurrentConcurrency,
79+
envCurrentConcurrency,
80+
queueReserveConcurrency,
81+
envReserveConcurrency,
82+
keys: [],
83+
});
84+
} else {
85+
const queueConcurrencyLimit = await engine.runQueue.getQueueConcurrencyLimit(
86+
run.runtimeEnvironment,
87+
run.queue
88+
);
89+
90+
const envConcurrencyLimit = await engine.runQueue.getEnvConcurrencyLimit(
91+
run.runtimeEnvironment
92+
);
93+
94+
const queueCurrentConcurrency = await engine.runQueue.currentConcurrencyOfQueue(
95+
run.runtimeEnvironment,
96+
run.queue,
97+
run.concurrencyKey ?? undefined
98+
);
99+
100+
const envCurrentConcurrency = await engine.runQueue.currentConcurrencyOfEnvironment(
101+
run.runtimeEnvironment
102+
);
103+
104+
const queueCurrentConcurrencyKey = engine.runQueue.keys.currentConcurrencyKey(
105+
run.runtimeEnvironment,
106+
run.queue,
107+
run.concurrencyKey ?? undefined
108+
);
109+
110+
const envCurrentConcurrencyKey = engine.runQueue.keys.envCurrentConcurrencyKey(
111+
run.runtimeEnvironment
112+
);
113+
114+
const queueConcurrencyLimitKey = engine.runQueue.keys.queueConcurrencyLimitKey(
115+
run.runtimeEnvironment,
116+
run.queue
117+
);
118+
119+
const envConcurrencyLimitKey = engine.runQueue.keys.envConcurrencyLimitKey(
120+
run.runtimeEnvironment
121+
);
122+
123+
const releaseConcurrencyBucketKey = `engine:release-concurrency:org:${run.runtimeEnvironment.organizationId}:proj:${run.runtimeEnvironment.project.id}:env:${run.runtimeEnvironment.id}:bucket`;
124+
const releaseConcurrencyQueueKey = `engine:release-concurrency:org:${run.runtimeEnvironment.organizationId}:proj:${run.runtimeEnvironment.project.id}:env:${run.runtimeEnvironment.id}:queue`;
125+
const releaseConcurrencyMetadataKey = `engine:release-concurrency:org:${run.runtimeEnvironment.organizationId}:proj:${run.runtimeEnvironment.project.id}:env:${run.runtimeEnvironment.id}:metadata`;
126+
127+
const withPrefix = (key: string) => `engine:runqueue:${key}`;
128+
129+
const keys = [
130+
{
131+
label: "Queue current concurrency set",
132+
key: withPrefix(queueCurrentConcurrencyKey),
133+
},
134+
{
135+
label: "Env current concurrency set",
136+
key: withPrefix(envCurrentConcurrencyKey),
137+
},
138+
{
139+
label: "Queue concurrency limit",
140+
key: withPrefix(queueConcurrencyLimitKey),
141+
},
142+
{
143+
label: "Env concurrency limit",
144+
key: withPrefix(envConcurrencyLimitKey),
145+
},
146+
{
147+
label: "Release concurrency bucket",
148+
key: releaseConcurrencyBucketKey,
149+
},
150+
{
151+
label: "Release concurrency queue",
152+
key: releaseConcurrencyQueueKey,
153+
},
154+
{
155+
label: "Release concurrency metadata",
156+
key: releaseConcurrencyMetadataKey,
157+
},
158+
{
159+
label: "Release concurrency releasings",
160+
key: "engine:release-concurrency:releasings",
161+
},
162+
];
163+
164+
return typedjson({
165+
engine: "V2",
166+
run,
167+
queueConcurrencyLimit,
168+
envConcurrencyLimit,
169+
queueCurrentConcurrency,
170+
envCurrentConcurrency,
171+
queueReserveConcurrency: undefined,
172+
envReserveConcurrency: undefined,
173+
keys,
174+
});
175+
}
73176
}

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,14 @@ function createRunEngine() {
7676
PENDING_CANCEL: env.RUN_ENGINE_TIMEOUT_PENDING_CANCEL,
7777
EXECUTING: env.RUN_ENGINE_TIMEOUT_EXECUTING,
7878
EXECUTING_WITH_WAITPOINTS: env.RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS,
79+
SUSPENDED: env.RUN_ENGINE_TIMEOUT_SUSPENDED,
7980
},
8081
releaseConcurrency: {
8182
disabled: env.RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED === "0",
8283
disableConsumers: env.RUN_ENGINE_RELEASE_CONCURRENCY_DISABLE_CONSUMERS === "1",
8384
maxTokensRatio: env.RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO,
85+
releasingsMaxAge: env.RUN_ENGINE_RELEASE_CONCURRENCY_RELEASINGS_MAX_AGE,
86+
releasingsPollInterval: env.RUN_ENGINE_RELEASE_CONCURRENCY_RELEASINGS_POLL_INTERVAL,
8487
maxRetries: env.RUN_ENGINE_RELEASE_CONCURRENCY_MAX_RETRIES,
8588
consumersCount: env.RUN_ENGINE_RELEASE_CONCURRENCY_CONSUMERS_COUNT,
8689
pollInterval: env.RUN_ENGINE_RELEASE_CONCURRENCY_POLL_INTERVAL,

internal-packages/run-engine/src/engine/index.ts

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ export class RunEngine {
182182
PENDING_CANCEL: 60_000,
183183
EXECUTING: 60_000,
184184
EXECUTING_WITH_WAITPOINTS: 60_000,
185+
SUSPENDED: 60_000 * 10,
185186
};
186187
this.heartbeatTimeouts = {
187188
...defaultHeartbeatTimeouts,
@@ -201,6 +202,9 @@ export class RunEngine {
201202

202203
this.releaseConcurrencySystem = new ReleaseConcurrencySystem({
203204
resources,
205+
maxTokensRatio: options.releaseConcurrency?.maxTokensRatio,
206+
releasingsMaxAge: options.releaseConcurrency?.releasingsMaxAge,
207+
releasingsPollInterval: options.releaseConcurrency?.releasingsPollInterval,
204208
queueOptions:
205209
typeof options.releaseConcurrency?.disabled === "boolean" &&
206210
options.releaseConcurrency.disabled
@@ -223,33 +227,6 @@ export class RunEngine {
223227
consumersCount: options.releaseConcurrency?.consumersCount ?? 1,
224228
pollInterval: options.releaseConcurrency?.pollInterval ?? 1000,
225229
batchSize: options.releaseConcurrency?.batchSize ?? 10,
226-
executor: async (descriptor, snapshotId) => {
227-
return await this.releaseConcurrencySystem.executeReleaseConcurrencyForSnapshot(
228-
snapshotId
229-
);
230-
},
231-
maxTokens: async (descriptor) => {
232-
const environment = await this.prisma.runtimeEnvironment.findFirstOrThrow({
233-
where: { id: descriptor.envId },
234-
select: {
235-
maximumConcurrencyLimit: true,
236-
},
237-
});
238-
239-
return (
240-
environment.maximumConcurrencyLimit *
241-
(options.releaseConcurrency?.maxTokensRatio ?? 1.0)
242-
);
243-
},
244-
keys: {
245-
fromDescriptor: (descriptor) =>
246-
`org:${descriptor.orgId}:proj:${descriptor.projectId}:env:${descriptor.envId}`,
247-
toDescriptor: (name) => ({
248-
orgId: name.split(":")[1],
249-
projectId: name.split(":")[3],
250-
envId: name.split(":")[5],
251-
}),
252-
},
253230
tracer: this.tracer,
254231
},
255232
});
@@ -306,6 +283,7 @@ export class RunEngine {
306283
delayedRunSystem: this.delayedRunSystem,
307284
machines: this.options.machines,
308285
retryWarmStartThresholdMs: this.options.retryWarmStartThresholdMs,
286+
releaseConcurrencySystem: this.releaseConcurrencySystem,
309287
});
310288

311289
this.dequeueSystem = new DequeueSystem({
@@ -1297,9 +1275,29 @@ export class RunEngine {
12971275
break;
12981276
}
12991277
case "SUSPENDED": {
1300-
//todo should we do a periodic check here for whether waitpoints are actually still blocking?
1301-
//we could at least log some things out if a run has been in this state for a long time
1302-
throw new NotImplementedError("Not implemented SUSPENDED");
1278+
const result = await this.waitpointSystem.continueRunIfUnblocked({ runId });
1279+
1280+
this.logger.info("handleStalledSnapshot SUSPENDED continueRunIfUnblocked", {
1281+
runId,
1282+
result,
1283+
snapshotId: latestSnapshot.id,
1284+
});
1285+
1286+
switch (result) {
1287+
case "blocked": {
1288+
// Reschedule the heartbeat
1289+
await this.executionSnapshotSystem.restartHeartbeatForRun({
1290+
runId,
1291+
});
1292+
break;
1293+
}
1294+
case "unblocked":
1295+
case "skipped": {
1296+
break;
1297+
}
1298+
}
1299+
1300+
break;
13031301
}
13041302
case "PENDING_CANCEL": {
13051303
//if the run is waiting to cancel but the worker hasn't confirmed that,

0 commit comments

Comments
 (0)