Skip to content

Commit e89fb92

Browse files
authored
re2: dev runs work without worker groups, fixed some type issues (#1756)
* In dev, the worker group is optional when triggering tasks (the master queue is defined by the environment). Also deprecated the TaskEvent.isDebug column and using TaskEventKind.LOG instead for debug events * Fixed a couple of type issues * More type fixes
1 parent e97704d commit e89fb92

File tree

9 files changed

+44
-53
lines changed

9 files changed

+44
-53
lines changed

apps/coordinator/src/checkpointer.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ export class Checkpointer {
277277
return result.checkpoint;
278278
} finally {
279279
if (opts.shouldHeartbeat) {
280+
// @ts-ignore - Some kind of node incompatible type issue
280281
clearInterval(interval);
281282
}
282283
removeCurrentAbortController();

apps/webapp/app/utils/taskEvent.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
TaskEventStyle,
1313
unflattenAttributes,
1414
} from "@trigger.dev/core/v3";
15-
import { Prisma, TaskEvent } from "@trigger.dev/database";
15+
import { Prisma, TaskEvent, TaskEventKind } from "@trigger.dev/database";
1616
import { createTreeFromFlatItems, flattenTree } from "~/components/primitives/TreeView/TreeView";
1717
import type {
1818
PreparedEvent,
@@ -76,7 +76,7 @@ export function prepareTrace(events: TaskEvent[]): TraceSummary | undefined {
7676
level: event.level,
7777
events: event.events,
7878
environmentType: event.environmentType,
79-
isDebug: event.isDebug,
79+
isDebug: event.kind === TaskEventKind.LOG,
8080
},
8181
} satisfies SpanSummary;
8282

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 7 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import {
2020
omit,
2121
unflattenAttributes,
2222
} from "@trigger.dev/core/v3";
23-
import { Prisma, TaskEvent, TaskEventStatus, type TaskEventKind } from "@trigger.dev/database";
23+
import { Prisma, TaskEvent, TaskEventKind, TaskEventStatus } from "@trigger.dev/database";
2424
import { createHash } from "node:crypto";
2525
import { EventEmitter } from "node:stream";
2626
import { Gauge } from "prom-client";
@@ -126,10 +126,10 @@ export type QueriedEvent = Prisma.TaskEventGetPayload<{
126126
isError: true;
127127
isPartial: true;
128128
isCancelled: true;
129-
isDebug: true;
130129
level: true;
131130
events: true;
132131
environmentType: true;
132+
kind: true;
133133
};
134134
}>;
135135

@@ -186,26 +186,6 @@ export type UpdateEventOptions = {
186186
events?: SpanEvents;
187187
};
188188

189-
type TaskEventSummary = Pick<
190-
TaskEvent,
191-
| "id"
192-
| "spanId"
193-
| "parentId"
194-
| "runId"
195-
| "idempotencyKey"
196-
| "message"
197-
| "style"
198-
| "startTime"
199-
| "duration"
200-
| "isError"
201-
| "isPartial"
202-
| "isCancelled"
203-
| "level"
204-
| "events"
205-
| "environmentType"
206-
| "isDebug"
207-
>;
208-
209189
export class EventRepository {
210190
private readonly _flushScheduler: DynamicFlushScheduler<CreatableEvent>;
211191
private _randomIdGenerator = new RandomIdGenerator();
@@ -512,7 +492,7 @@ export class EventRepository {
512492
isError: event.isError,
513493
isPartial: ancestorCancelled ? false : event.isPartial,
514494
isCancelled: event.isCancelled === true ? true : event.isPartial && ancestorCancelled,
515-
isDebug: event.isDebug,
495+
isDebug: event.kind === TaskEventKind.LOG,
516496
startTime: getDateFromNanoseconds(event.startTime),
517497
level: event.level,
518498
events: event.events,
@@ -569,7 +549,7 @@ export class EventRepository {
569549
isError: true,
570550
isPartial: true,
571551
isCancelled: true,
572-
isDebug: true,
552+
kind: true,
573553
level: true,
574554
events: true,
575555
environmentType: true,
@@ -865,10 +845,8 @@ export class EventRepository {
865845
...options.attributes.metadata,
866846
};
867847

868-
const isDebug = options.attributes.isDebug;
869-
870848
const style = {
871-
[SemanticInternalAttributes.STYLE_ICON]: isDebug ? "warn" : "play",
849+
[SemanticInternalAttributes.STYLE_ICON]: options.attributes.isDebug ? "warn" : "play",
872850
};
873851

874852
if (!options.attributes.runId) {
@@ -883,12 +861,11 @@ export class EventRepository {
883861
message: message,
884862
serviceName: "api server",
885863
serviceNamespace: "trigger.dev",
886-
level: isDebug ? "WARN" : "TRACE",
887-
kind: options.kind,
864+
level: options.attributes.isDebug ? "WARN" : "TRACE",
865+
kind: options.attributes.isDebug ? TaskEventKind.LOG : options.kind,
888866
status: "OK",
889867
startTime,
890868
isPartial: false,
891-
isDebug,
892869
duration, // convert to nanoseconds
893870
environmentId: options.environment.id,
894871
environmentType: options.environment.type,

apps/webapp/app/v3/services/triggerTaskV2.server.ts

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -312,21 +312,7 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
312312
event.setAttribute("runId", runFriendlyId);
313313
span.setAttribute("runId", runFriendlyId);
314314

315-
const workerGroupService = new WorkerGroupService({
316-
prisma: this._prisma,
317-
engine: this._engine,
318-
});
319-
const workerGroup = await workerGroupService.getDefaultWorkerGroupForProject({
320-
projectId: environment.projectId,
321-
});
322-
323-
if (!workerGroup) {
324-
logger.error("Default worker group not found", {
325-
projectId: environment.projectId,
326-
});
327-
328-
return;
329-
}
315+
const masterQueue = await this.#getMasterQueueForEnvironment(environment);
330316

331317
const taskRun = await this._engine.trigger(
332318
{
@@ -351,7 +337,7 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
351337
concurrencyKey: body.options?.concurrencyKey,
352338
queueName,
353339
queue: body.options?.queue,
354-
masterQueue: workerGroup.masterQueue,
340+
masterQueue: masterQueue,
355341
isTest: body.options?.test ?? false,
356342
delayUntil,
357343
queuedAt: delayUntil ? undefined : new Date(),
@@ -441,6 +427,27 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
441427
});
442428
}
443429

430+
async #getMasterQueueForEnvironment(environment: AuthenticatedEnvironment) {
431+
if (environment.type === "DEVELOPMENT") {
432+
return;
433+
}
434+
435+
const workerGroupService = new WorkerGroupService({
436+
prisma: this._prisma,
437+
engine: this._engine,
438+
});
439+
440+
const workerGroup = await workerGroupService.getDefaultWorkerGroupForProject({
441+
projectId: environment.projectId,
442+
});
443+
444+
if (!workerGroup) {
445+
throw new ServiceValidationError("No worker group found");
446+
}
447+
448+
return workerGroup.masterQueue;
449+
}
450+
444451
async #getQueueName(taskId: string, environment: AuthenticatedEnvironment, queueName?: string) {
445452
if (queueName) {
446453
return queueName;

apps/webapp/app/v3/taskEventStore.server.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ export type TraceEvent = Pick<
2020
| "level"
2121
| "events"
2222
| "environmentType"
23-
| "isDebug"
23+
| "kind"
2424
>;
2525

2626
export type TaskEventStoreTable = "taskEvent" | "taskEventPartitioned";
@@ -138,7 +138,7 @@ export class TaskEventStore {
138138
level,
139139
events,
140140
"environmentType",
141-
"isDebug"
141+
"kind"
142142
FROM "TaskEventPartitioned"
143143
WHERE
144144
"traceId" = ${traceId}
@@ -168,7 +168,7 @@ export class TaskEventStore {
168168
level,
169169
events,
170170
"environmentType",
171-
"isDebug"
171+
"kind"
172172
FROM "TaskEvent"
173173
WHERE "traceId" = ${traceId}
174174
ORDER BY "startTime" ASC

internal-packages/database/prisma/schema.prisma

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2381,6 +2381,7 @@ model TaskEvent {
23812381
isError Boolean @default(false)
23822382
isPartial Boolean @default(false)
23832383
isCancelled Boolean @default(false)
2384+
/// deprecated: don't use this, moving this to properties, this now uses TaskEventKind.LOG
23842385
isDebug Boolean @default(false)
23852386
23862387
serviceName String

internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ describe("RunEngine batchTriggerAndWait", () => {
104104
batchId: batch.id,
105105
environmentId: authenticatedEnvironment.id,
106106
projectId: authenticatedEnvironment.projectId,
107+
organizationId: authenticatedEnvironment.organizationId,
107108
});
108109

109110
const afterBlockedByBatch = await engine.getRunExecutionData({ runId: parentRun.id });

internal-packages/run-engine/src/engine/types.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,14 @@ export type TriggerParams = {
5858
sdkVersion?: string;
5959
cliVersion?: string;
6060
concurrencyKey?: string;
61-
masterQueue: string;
61+
masterQueue?: string;
6262
queueName: string;
6363
queue?: QueueOptions;
6464
isTest: boolean;
6565
delayUntil?: Date;
6666
queuedAt?: Date;
6767
maxAttempts?: number;
68-
taskEventStore: string;
68+
taskEventStore?: string;
6969
priorityMs?: number;
7070
ttl?: string;
7171
tags: { id: string; name: string }[];

internal-packages/testcontainers/src/utils.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ async function verifyRedisConnection(container: StartedRedisContainer) {
8787
},
8888
});
8989

90+
redis.on("error", (error) => {
91+
// swallow the error
92+
});
93+
9094
try {
9195
await redis.ping();
9296
} finally {

0 commit comments

Comments
 (0)