Skip to content

Commit 2b586c8

Browse files
authored
Fix controller waitpoint resolution, suspendable state, and snapshot race conditions (#2006)
* remove dead code * rename managed to shared runtime manager * rename to resolve waitpoint for clarity * add resolver id helper * store and correctly resolve waipoints that come in early * fix ipc message type change * branded type for resolver ids * add fixme comments * remove more unused ipc schemas * fix entitlement validation when client doesn't exist * restore hello world reference workspace imports * runtime manager debug logs * prefix engine run logs * managed run logger accepts nested props * runtime suspendable state and improved logs * require suspendable state for checkpoints, fix snapshot processing queue * add terminal link as cli module so we can more easily patch it * apply cursor patch * add license info * remove terminal-link package and add deprecation notice * remove old patch * remove terminal-link from sdk * rename snapshot module * add cli test tsconfig * add run logger base type * add snapshot manager tests * fix cli builds * improve QUEUED_EXECUTING test * changeset * make testcontainers wait until container has stopped * require unit tests for publishing again * avoid mutation during iteration when resolving pending waitpoints * improve debug logs and make them less noisy * always update poller snapshot id for accurate logs * detach task run process handlers * check for env overrides in a few more places and add verbose logs * log when poller is still executing when we stop it * add supervisor to publish workflow * always print full deploy logs in CI * Revert "avoid mutation during iteration when resolving pending waitpoints" This reverts commit 87b0ce1. * disable pre * print prerelease script errors * Revert "disable pre" This reverts commit 9403409. * misc fixes * better debug logs * add snapshots since methods and route * prep for snapshots since * improve deprecated execution detection * update supervisor and schema * properly log http server errors * detect restore after failed snapshot fetch * run and snapshot id can be overridden * fix restore detection * fix deprecation checks, move into snapshot manager * less logs * rename snapshot manager stop * restore detection was moved into snapshot manager * fix notifier logs * make runtime manager status a debug log * no need to attach runtime status twice * findUnique -> findFirst * sort snapshots by created at everywhere
1 parent 053389d commit 2b586c8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2723
-937
lines changed

.changeset/plenty-dolphins-act.md

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"trigger.dev": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
- Correctly resolve waitpoints that come in early
7+
- Ensure correct state before requesting suspension
8+
- Fix race conditions in snapshot processing

.changeset/sweet-dolphins-invent.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"trigger.dev": patch
3+
---
4+
5+
Always print full deploy logs in CI

apps/supervisor/src/workloadServer/index.ts

+26
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
WorkloadRunAttemptStartRequestBody,
1818
type WorkloadRunAttemptStartResponseBody,
1919
type WorkloadRunLatestSnapshotResponseBody,
20+
WorkloadRunSnapshotsSinceResponseBody,
2021
type WorkloadServerToClientEvents,
2122
type WorkloadSuspendRunResponseBody,
2223
} from "@trigger.dev/core/v3/workers";
@@ -341,6 +342,31 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
341342
} satisfies WorkloadRunLatestSnapshotResponseBody);
342343
},
343344
})
345+
.route(
346+
"/api/v1/workload-actions/runs/:runFriendlyId/snapshots/since/:snapshotFriendlyId",
347+
"GET",
348+
{
349+
paramsSchema: WorkloadActionParams,
350+
handler: async ({ req, reply, params }) => {
351+
const sinceSnapshotResponse = await this.workerClient.getSnapshotsSince(
352+
params.runFriendlyId,
353+
params.snapshotFriendlyId,
354+
this.runnerIdFromRequest(req)
355+
);
356+
357+
if (!sinceSnapshotResponse.success) {
358+
console.error("Failed to get snapshots since", {
359+
runId: params.runFriendlyId,
360+
error: sinceSnapshotResponse.error,
361+
});
362+
reply.empty(500);
363+
return;
364+
}
365+
366+
reply.json(sinceSnapshotResponse.data satisfies WorkloadRunSnapshotsSinceResponseBody);
367+
},
368+
}
369+
)
344370
.route("/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", "POST", {
345371
paramsSchema: WorkloadActionParams.pick({ runFriendlyId: true }),
346372
bodySchema: WorkloadDebugLogRequestBody,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import { json, TypedResponse } from "@remix-run/server-runtime";
2+
import { WorkerApiRunSnapshotsSinceResponseBody } from "@trigger.dev/core/v3/workers";
3+
import { z } from "zod";
4+
import { createLoaderWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.server";
5+
6+
export const loader = createLoaderWorkerApiRoute(
7+
{
8+
params: z.object({
9+
runFriendlyId: z.string(),
10+
snapshotId: z.string(),
11+
}),
12+
},
13+
async ({
14+
authenticatedWorker,
15+
params,
16+
}): Promise<TypedResponse<WorkerApiRunSnapshotsSinceResponseBody>> => {
17+
const { runFriendlyId, snapshotId } = params;
18+
19+
const snapshots = await authenticatedWorker.getSnapshotsSince({
20+
runFriendlyId,
21+
snapshotId,
22+
});
23+
24+
if (!snapshots) {
25+
throw new Error("Failed to retrieve snapshots since given snapshot");
26+
}
27+
28+
return json({ snapshots });
29+
}
30+
);

apps/webapp/app/v3/runEngineHandlers.server.ts

+4-2
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ export function registerRunEngineEventBusHandlers() {
401401
engine.eventBus.on("executionSnapshotCreated", async ({ time, run, snapshot }) => {
402402
const eventResult = await recordRunDebugLog(
403403
run.id,
404-
`${snapshot.executionStatus} - ${snapshot.description}`,
404+
`[engine] ${snapshot.executionStatus} - ${snapshot.description}`,
405405
{
406406
attributes: {
407407
properties: {
@@ -450,6 +450,7 @@ export function registerRunEngineEventBusHandlers() {
450450
// Record notification event
451451
const eventResult = await recordRunDebugLog(
452452
run.id,
453+
// don't prefix this with [engine] - "run:notify" is the correct prefix
453454
`run:notify platform -> supervisor: ${snapshot.executionStatus}`,
454455
{
455456
attributes: {
@@ -479,6 +480,7 @@ export function registerRunEngineEventBusHandlers() {
479480
// Record notification event
480481
const eventResult = await recordRunDebugLog(
481482
run.id,
483+
// don't prefix this with [engine] - "run:notify" is the correct prefix
482484
`run:notify ERROR platform -> supervisor: ${snapshot.executionStatus}`,
483485
{
484486
attributes: {
@@ -505,7 +507,7 @@ export function registerRunEngineEventBusHandlers() {
505507
engine.eventBus.on("incomingCheckpointDiscarded", async ({ time, run, snapshot, checkpoint }) => {
506508
const eventResult = await recordRunDebugLog(
507509
run.id,
508-
`Checkpoint discarded: ${checkpoint.discardReason}`,
510+
`[engine] Checkpoint discarded: ${checkpoint.discardReason}`,
509511
{
510512
attributes: {
511513
properties: {

apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts

+13
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,19 @@ export class AuthenticatedWorkerInstance extends WithRunEngine {
759759
});
760760
}
761761

762+
async getSnapshotsSince({
763+
runFriendlyId,
764+
snapshotId,
765+
}: {
766+
runFriendlyId: string;
767+
snapshotId: string;
768+
}) {
769+
return await this._engine.getSnapshotsSince({
770+
runId: fromFriendlyId(runFriendlyId),
771+
snapshotId: fromFriendlyId(snapshotId),
772+
});
773+
}
774+
762775
toJSON(): WorkerGroupTokenAuthenticationResponse {
763776
if (this.type === WorkerInstanceGroupType.MANAGED) {
764777
return {

internal-packages/run-engine/src/engine/index.ts

+24-37
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ import { EnqueueSystem } from "./systems/enqueueSystem.js";
4343
import {
4444
ExecutionSnapshotSystem,
4545
getLatestExecutionSnapshot,
46+
getExecutionSnapshotsSince,
47+
executionDataFromSnapshot,
4648
} from "./systems/executionSnapshotSystem.js";
4749
import { PendingVersionSystem } from "./systems/pendingVersionSystem.js";
4850
import { ReleaseConcurrencySystem } from "./systems/releaseConcurrencySystem.js";
@@ -1100,43 +1102,31 @@ export class RunEngine {
11001102
const prisma = tx ?? this.prisma;
11011103
try {
11021104
const snapshot = await getLatestExecutionSnapshot(prisma, runId);
1105+
return executionDataFromSnapshot(snapshot);
1106+
} catch (e) {
1107+
this.logger.error("Failed to getRunExecutionData", {
1108+
message: e instanceof Error ? e.message : e,
1109+
});
1110+
return null;
1111+
}
1112+
}
11031113

1104-
const executionData: RunExecutionData = {
1105-
version: "1" as const,
1106-
snapshot: {
1107-
id: snapshot.id,
1108-
friendlyId: snapshot.friendlyId,
1109-
executionStatus: snapshot.executionStatus,
1110-
description: snapshot.description,
1111-
},
1112-
run: {
1113-
id: snapshot.runId,
1114-
friendlyId: snapshot.runFriendlyId,
1115-
status: snapshot.runStatus,
1116-
attemptNumber: snapshot.attemptNumber ?? undefined,
1117-
},
1118-
batch: snapshot.batchId
1119-
? {
1120-
id: snapshot.batchId,
1121-
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
1122-
}
1123-
: undefined,
1124-
checkpoint: snapshot.checkpoint
1125-
? {
1126-
id: snapshot.checkpoint.id,
1127-
friendlyId: snapshot.checkpoint.friendlyId,
1128-
type: snapshot.checkpoint.type,
1129-
location: snapshot.checkpoint.location,
1130-
imageRef: snapshot.checkpoint.imageRef,
1131-
reason: snapshot.checkpoint.reason ?? undefined,
1132-
}
1133-
: undefined,
1134-
completedWaitpoints: snapshot.completedWaitpoints,
1135-
};
1114+
async getSnapshotsSince({
1115+
runId,
1116+
snapshotId,
1117+
tx,
1118+
}: {
1119+
runId: string;
1120+
snapshotId: string;
1121+
tx?: PrismaClientOrTransaction;
1122+
}): Promise<RunExecutionData[] | null> {
1123+
const prisma = tx ?? this.prisma;
11361124

1137-
return executionData;
1125+
try {
1126+
const snapshots = await getExecutionSnapshotsSince(prisma, runId, snapshotId);
1127+
return snapshots.map(executionDataFromSnapshot);
11381128
} catch (e) {
1139-
this.logger.error("Failed to getRunExecutionData", {
1129+
this.logger.error("Failed to getSnapshotsSince", {
11401130
message: e instanceof Error ? e.message : e,
11411131
});
11421132
return null;
@@ -1158,9 +1148,6 @@ export class RunEngine {
11581148
}
11591149
}
11601150

1161-
//#endregion
1162-
1163-
//#region Heartbeat
11641151
async #handleStalledSnapshot({
11651152
runId,
11661153
snapshotId,

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

+1
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,7 @@ export class DequeueSystem {
409409
friendlyId: newSnapshot.friendlyId,
410410
executionStatus: newSnapshot.executionStatus,
411411
description: newSnapshot.description,
412+
createdAt: newSnapshot.createdAt,
412413
},
413414
image: result.deployment?.imageReference ?? undefined,
414415
checkpoint: newSnapshot.checkpoint ?? undefined,

internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts

+100-19
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { CompletedWaitpoint, ExecutionResult } from "@trigger.dev/core/v3";
1+
import { CompletedWaitpoint, ExecutionResult, RunExecutionData } from "@trigger.dev/core/v3";
22
import { BatchId, RunId, SnapshotId } from "@trigger.dev/core/v3/isomorphic";
33
import {
44
Prisma,
@@ -17,31 +17,23 @@ export type ExecutionSnapshotSystemOptions = {
1717
heartbeatTimeouts: HeartbeatTimeouts;
1818
};
1919

20-
export interface LatestExecutionSnapshot extends TaskRunExecutionSnapshot {
20+
export interface EnhancedExecutionSnapshot extends TaskRunExecutionSnapshot {
2121
friendlyId: string;
2222
runFriendlyId: string;
2323
checkpoint: TaskRunCheckpoint | null;
2424
completedWaitpoints: CompletedWaitpoint[];
2525
}
2626

27-
/* Gets the most recent valid snapshot for a run */
28-
export async function getLatestExecutionSnapshot(
29-
prisma: PrismaClientOrTransaction,
30-
runId: string
31-
): Promise<LatestExecutionSnapshot> {
32-
const snapshot = await prisma.taskRunExecutionSnapshot.findFirst({
33-
where: { runId, isValid: true },
34-
include: {
35-
completedWaitpoints: true,
36-
checkpoint: true,
37-
},
38-
orderBy: { createdAt: "desc" },
39-
});
40-
41-
if (!snapshot) {
42-
throw new Error(`No execution snapshot found for TaskRun ${runId}`);
43-
}
27+
type ExecutionSnapshotWithCheckAndWaitpoints = Prisma.TaskRunExecutionSnapshotGetPayload<{
28+
include: {
29+
checkpoint: true;
30+
completedWaitpoints: true;
31+
};
32+
}>;
4433

34+
function enhanceExecutionSnapshot(
35+
snapshot: ExecutionSnapshotWithCheckAndWaitpoints
36+
): EnhancedExecutionSnapshot {
4537
return {
4638
...snapshot,
4739
friendlyId: SnapshotId.toFriendlyId(snapshot.id),
@@ -99,6 +91,27 @@ export async function getLatestExecutionSnapshot(
9991
};
10092
}
10193

94+
/* Gets the most recent valid snapshot for a run */
95+
export async function getLatestExecutionSnapshot(
96+
prisma: PrismaClientOrTransaction,
97+
runId: string
98+
): Promise<EnhancedExecutionSnapshot> {
99+
const snapshot = await prisma.taskRunExecutionSnapshot.findFirst({
100+
where: { runId, isValid: true },
101+
include: {
102+
completedWaitpoints: true,
103+
checkpoint: true,
104+
},
105+
orderBy: { createdAt: "desc" },
106+
});
107+
108+
if (!snapshot) {
109+
throw new Error(`No execution snapshot found for TaskRun ${runId}`);
110+
}
111+
112+
return enhanceExecutionSnapshot(snapshot);
113+
}
114+
102115
export async function getExecutionSnapshotCompletedWaitpoints(
103116
prisma: PrismaClientOrTransaction,
104117
snapshotId: string
@@ -131,6 +144,7 @@ export function executionResultFromSnapshot(snapshot: TaskRunExecutionSnapshot):
131144
friendlyId: SnapshotId.toFriendlyId(snapshot.id),
132145
executionStatus: snapshot.executionStatus,
133146
description: snapshot.description,
147+
createdAt: snapshot.createdAt,
134148
},
135149
run: {
136150
id: snapshot.runId,
@@ -141,6 +155,73 @@ export function executionResultFromSnapshot(snapshot: TaskRunExecutionSnapshot):
141155
};
142156
}
143157

158+
export function executionDataFromSnapshot(snapshot: EnhancedExecutionSnapshot): RunExecutionData {
159+
return {
160+
version: "1" as const,
161+
snapshot: {
162+
id: snapshot.id,
163+
friendlyId: snapshot.friendlyId,
164+
executionStatus: snapshot.executionStatus,
165+
description: snapshot.description,
166+
createdAt: snapshot.createdAt,
167+
},
168+
run: {
169+
id: snapshot.runId,
170+
friendlyId: snapshot.runFriendlyId,
171+
status: snapshot.runStatus,
172+
attemptNumber: snapshot.attemptNumber ?? undefined,
173+
},
174+
batch: snapshot.batchId
175+
? {
176+
id: snapshot.batchId,
177+
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
178+
}
179+
: undefined,
180+
checkpoint: snapshot.checkpoint
181+
? {
182+
id: snapshot.checkpoint.id,
183+
friendlyId: snapshot.checkpoint.friendlyId,
184+
type: snapshot.checkpoint.type,
185+
location: snapshot.checkpoint.location,
186+
imageRef: snapshot.checkpoint.imageRef,
187+
reason: snapshot.checkpoint.reason ?? undefined,
188+
}
189+
: undefined,
190+
completedWaitpoints: snapshot.completedWaitpoints,
191+
};
192+
}
193+
194+
export async function getExecutionSnapshotsSince(
195+
prisma: PrismaClientOrTransaction,
196+
runId: string,
197+
sinceSnapshotId: string
198+
): Promise<EnhancedExecutionSnapshot[]> {
199+
// Find the createdAt of the sinceSnapshotId
200+
const sinceSnapshot = await prisma.taskRunExecutionSnapshot.findFirst({
201+
where: { id: sinceSnapshotId },
202+
select: { createdAt: true },
203+
});
204+
205+
if (!sinceSnapshot) {
206+
throw new Error(`No execution snapshot found for id ${sinceSnapshotId}`);
207+
}
208+
209+
const snapshots = await prisma.taskRunExecutionSnapshot.findMany({
210+
where: {
211+
runId,
212+
isValid: true,
213+
createdAt: { gt: sinceSnapshot.createdAt },
214+
},
215+
include: {
216+
completedWaitpoints: true,
217+
checkpoint: true,
218+
},
219+
orderBy: { createdAt: "asc" },
220+
});
221+
222+
return snapshots.map(enhanceExecutionSnapshot);
223+
}
224+
144225
export class ExecutionSnapshotSystem {
145226
private readonly $: SystemResources;
146227
private readonly heartbeatTimeouts: HeartbeatTimeouts;

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

+1
Original file line numberDiff line numberDiff line change
@@ -892,6 +892,7 @@ export class RunAttemptSystem {
892892
friendlyId: newSnapshot.friendlyId,
893893
executionStatus: newSnapshot.executionStatus,
894894
description: newSnapshot.description,
895+
createdAt: newSnapshot.createdAt,
895896
},
896897
run: {
897898
id: newSnapshot.runId,

0 commit comments

Comments
 (0)