Skip to content

Commit c63559f

Browse files
authored
v4: fix race condition when continuing run when blocked at the same time (#2073)
* v4: fix race condition when continuing run when blocked at the same time * Attempt to fix flaky e2e test * Make waitpoint race condition test less flaky
1 parent 8471169 commit c63559f

File tree

6 files changed

+177
-11
lines changed

6 files changed

+177
-11
lines changed

internal-packages/run-engine/src/engine/index.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import { TtlSystem } from "./systems/ttlSystem.js";
5050
import { WaitpointSystem } from "./systems/waitpointSystem.js";
5151
import { EngineWorker, HeartbeatTimeouts, RunEngineOptions, TriggerParams } from "./types.js";
5252
import { workerCatalog } from "./workerCatalog.js";
53+
import { RaceSimulationSystem } from "./systems/raceSimulationSystem.js";
5354

5455
export class RunEngine {
5556
private runLockRedis: Redis;
@@ -73,6 +74,7 @@ export class RunEngine {
7374
ttlSystem: TtlSystem;
7475
pendingVersionSystem: PendingVersionSystem;
7576
releaseConcurrencySystem: ReleaseConcurrencySystem;
77+
raceSimulationSystem: RaceSimulationSystem = new RaceSimulationSystem();
7678

7779
constructor(private readonly options: RunEngineOptions) {
7880
this.prisma = options.prisma;
@@ -194,6 +196,7 @@ export class RunEngine {
194196
tracer: this.tracer,
195197
runLock: this.runLock,
196198
runQueue: this.runQueue,
199+
raceSimulationSystem: this.raceSimulationSystem,
197200
};
198201

199202
this.releaseConcurrencySystem = new ReleaseConcurrencySystem({
@@ -522,6 +525,7 @@ export class RunEngine {
522525
runId: parentTaskRunId,
523526
waitpoints: associatedWaitpoint.id,
524527
projectId: associatedWaitpoint.projectId,
528+
organizationId: environment.organization.id,
525529
batch,
526530
workerId,
527531
runnerId,
@@ -966,6 +970,7 @@ export class RunEngine {
966970
runId,
967971
waitpoints,
968972
projectId,
973+
organizationId,
969974
releaseConcurrency,
970975
timeout,
971976
spanIdToComplete,
@@ -990,6 +995,7 @@ export class RunEngine {
990995
runId,
991996
waitpoints,
992997
projectId,
998+
organizationId,
993999
releaseConcurrency,
9941000
timeout,
9951001
spanIdToComplete,
@@ -1140,6 +1146,10 @@ export class RunEngine {
11401146
}
11411147
}
11421148

1149+
async registerRacepointForRun({ runId, waitInterval }: { runId: string; waitInterval: number }) {
1150+
return this.raceSimulationSystem.registerRacepointForRun({ runId, waitInterval });
1151+
}
1152+
11431153
async quit() {
11441154
try {
11451155
//stop the run queue
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import { promiseWithResolvers } from "@trigger.dev/core";
2+
3+
export class RaceSimulationSystem {
4+
private racepoints: Record<string, Promise<void> | undefined> = {};
5+
6+
constructor() {}
7+
8+
async waitForRacepoint({ runId }: { runId: string }): Promise<void> {
9+
if (this.racepoints[runId]) {
10+
return this.racepoints[runId];
11+
}
12+
13+
return Promise.resolve();
14+
}
15+
16+
registerRacepointForRun({ runId, waitInterval }: { runId: string; waitInterval: number }) {
17+
if (this.racepoints[runId]) {
18+
return;
19+
}
20+
21+
const { promise, resolve } = promiseWithResolvers<void>();
22+
23+
this.racepoints[runId] = promise;
24+
25+
setTimeout(() => {
26+
resolve();
27+
}, waitInterval);
28+
29+
promise.then(() => {
30+
delete this.racepoints[runId];
31+
});
32+
}
33+
}

internal-packages/run-engine/src/engine/systems/systems.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { RunQueue } from "../../run-queue/index.js";
55
import { EventBus } from "../eventBus.js";
66
import { RunLocker } from "../locking.js";
77
import { EngineWorker } from "../types.js";
8+
import { RaceSimulationSystem } from "./raceSimulationSystem.js";
89

910
export type SystemResources = {
1011
prisma: PrismaClient;
@@ -14,4 +15,5 @@ export type SystemResources = {
1415
tracer: Tracer;
1516
runLock: RunLocker;
1617
runQueue: RunQueue;
18+
raceSimulationSystem: RaceSimulationSystem;
1719
};

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,7 @@ export class WaitpointSystem {
353353
runId,
354354
waitpoints,
355355
projectId,
356+
organizationId,
356357
releaseConcurrency,
357358
timeout,
358359
spanIdToComplete,
@@ -364,6 +365,7 @@ export class WaitpointSystem {
364365
runId: string;
365366
waitpoints: string | string[];
366367
projectId: string;
368+
organizationId: string;
367369
releaseConcurrency?: boolean;
368370
timeout?: Date;
369371
spanIdToComplete?: string;
@@ -374,6 +376,8 @@ export class WaitpointSystem {
374376
}): Promise<TaskRunExecutionSnapshot> {
375377
const prisma = tx ?? this.$.prisma;
376378

379+
await this.$.raceSimulationSystem.waitForRacepoint({ runId });
380+
377381
let $waitpoints = typeof waitpoints === "string" ? [waitpoints] : waitpoints;
378382

379383
return await this.$.runLock.lock("blockRunWithWaitpoint", [runId], 5000, async () => {
@@ -439,7 +443,7 @@ export class WaitpointSystem {
439443
environmentId: snapshot.environmentId,
440444
environmentType: snapshot.environmentType,
441445
projectId: snapshot.projectId,
442-
organizationId: snapshot.organizationId,
446+
organizationId,
443447
// Do NOT carry over the batchId from the previous snapshot
444448
batchId: batch?.id,
445449
workerId,
@@ -495,6 +499,7 @@ export class WaitpointSystem {
495499
const blockingWaitpoints = await this.$.prisma.taskRunWaitpoint.findMany({
496500
where: { taskRunId: runId },
497501
select: {
502+
id: true,
498503
batchId: true,
499504
batchIndex: true,
500505
waitpoint: {
@@ -503,6 +508,8 @@ export class WaitpointSystem {
503508
},
504509
});
505510

511+
await this.$.raceSimulationSystem.waitForRacepoint({ runId });
512+
506513
// 2. There are blockers still, so do nothing
507514
if (blockingWaitpoints.some((w) => w.waitpoint.status !== "COMPLETED")) {
508515
this.$.logger.debug(`continueRunIfUnblocked: blocking waitpoints still exist`, {
@@ -657,16 +664,19 @@ export class WaitpointSystem {
657664
}
658665
});
659666

660-
//5. Remove the blocking waitpoints
661-
await this.$.prisma.taskRunWaitpoint.deleteMany({
662-
where: {
663-
taskRunId: runId,
664-
},
665-
});
667+
if (blockingWaitpoints.length > 0) {
668+
//5. Remove the blocking waitpoints
669+
await this.$.prisma.taskRunWaitpoint.deleteMany({
670+
where: {
671+
taskRunId: runId,
672+
id: { in: blockingWaitpoints.map((b) => b.id) },
673+
},
674+
});
666675

667-
this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, {
668-
runId,
669-
});
676+
this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, {
677+
runId,
678+
});
679+
}
670680
}
671681

672682
public async createRunAssociatedWaitpoint(
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import { containerTest } from "@internal/testcontainers";
2+
import { trace } from "@internal/tracing";
3+
import { expect } from "vitest";
4+
import { RunEngine } from "../index.js";
5+
import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js";
6+
import { setTimeout } from "timers/promises";
7+
8+
vi.setConfig({ testTimeout: 60_000 });
9+
10+
describe("RunEngine Waitpoints – race condition", () => {
11+
containerTest(
12+
"join-row removed before run continues (failing race)",
13+
async ({ prisma, redisOptions }) => {
14+
const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
15+
const engine = new RunEngine({
16+
prisma,
17+
worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 },
18+
queue: { redis: redisOptions },
19+
runLock: { redis: redisOptions },
20+
machines: {
21+
defaultMachine: "small-1x",
22+
machines: {
23+
"small-1x": { name: "small-1x" as const, cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 },
24+
},
25+
baseCostInCents: 0.0001,
26+
},
27+
tracer: trace.getTracer("test", "0.0.0"),
28+
});
29+
30+
try {
31+
const taskIdentifier = "test-task";
32+
await setupBackgroundWorker(engine, env, taskIdentifier);
33+
34+
const run = await engine.trigger(
35+
{
36+
number: 1,
37+
friendlyId: "run_race",
38+
environment: env,
39+
taskIdentifier,
40+
payload: "{}",
41+
payloadType: "application/json",
42+
context: {},
43+
traceContext: {},
44+
traceId: "race-trace",
45+
spanId: "race-span",
46+
masterQueue: "main",
47+
queue: "task/test-task",
48+
isTest: false,
49+
tags: [],
50+
},
51+
prisma
52+
);
53+
54+
const dequeued = await engine.dequeueFromMasterQueue({
55+
consumerId: "test",
56+
masterQueue: run.masterQueue,
57+
maxRunCount: 10,
58+
});
59+
await engine.startRunAttempt({
60+
runId: dequeued[0].run.id,
61+
snapshotId: dequeued[0].snapshot.id,
62+
});
63+
64+
// create manual waitpoint
65+
const { waitpoint } = await engine.createManualWaitpoint({
66+
environmentId: env.id,
67+
projectId: env.projectId,
68+
});
69+
70+
// block the run
71+
await engine.blockRunWithWaitpoint({
72+
runId: run.id,
73+
waitpoints: waitpoint.id,
74+
projectId: env.projectId,
75+
organizationId: env.organizationId,
76+
});
77+
78+
// Now we need to block the run again right after the continueRunIfUnblocked function
79+
// is called as a result of the above completeWaitpoint call
80+
const { waitpoint: waitpoint2 } = await engine.createManualWaitpoint({
81+
environmentId: env.id,
82+
projectId: env.projectId,
83+
});
84+
85+
engine.registerRacepointForRun({ runId: run.id, waitInterval: 500 });
86+
87+
// complete the waitpoint (this will schedule a continueRunIfUnblocked job normally)
88+
await engine.completeWaitpoint({ id: waitpoint.id });
89+
90+
await engine.waitpointSystem.blockRunWithWaitpoint({
91+
runId: run.id,
92+
waitpoints: waitpoint2.id,
93+
projectId: env.projectId,
94+
organizationId: env.organizationId,
95+
});
96+
97+
await setTimeout(1000);
98+
99+
// The join row SHOULD still exist until the run progresses naturally.
100+
const joinRow = await prisma.taskRunWaitpoint.findFirst({
101+
where: { taskRunId: run.id, waitpointId: waitpoint2.id },
102+
});
103+
104+
// Intentionally expect it to still be there – current implementation erroneously deletes it so test fails.
105+
expect(joinRow).not.toBeNull();
106+
} finally {
107+
await engine.quit();
108+
}
109+
}
110+
);
111+
});

packages/cli-v3/e2e/fixtures.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ export const fixturesConfig: TestCase[] = [
6666
{
6767
task: { id: "helloWorld", filePath: "src/trigger/helloWorld.ts", exportName: "helloWorld" },
6868
payload: "{}",
69-
result: { ok: true, durationMs: 1000 },
69+
result: { ok: true, durationMs: 500 },
7070
},
7171
],
7272
tsconfig: "tsconfig.json",

0 commit comments

Comments
 (0)