diff --git a/.changeset/sour-mirrors-accept.md b/.changeset/sour-mirrors-accept.md new file mode 100644 index 0000000000..34084228ca --- /dev/null +++ b/.changeset/sour-mirrors-accept.md @@ -0,0 +1,6 @@ +--- +"trigger.dev": patch +"@trigger.dev/core": patch +--- + +Improve usage flushing diff --git a/packages/cli-v3/src/entryPoints/managed-run-controller.ts b/packages/cli-v3/src/entryPoints/managed-run-controller.ts index 4baa701b05..212dc6a19c 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-controller.ts @@ -2,6 +2,9 @@ import { env as stdEnv } from "std-env"; import { readJSONFile } from "../utilities/fileSystem.js"; import { WorkerManifest } from "@trigger.dev/core/v3"; import { ManagedRunController } from "./managed/controller.js"; +import { logger } from "../utilities/logger.js"; + +logger.loggerLevel = "debug"; const manifest = await readJSONFile("./index.json"); const workerManifest = WorkerManifest.parse(manifest); diff --git a/packages/cli-v3/src/entryPoints/managed-run-worker.ts b/packages/cli-v3/src/entryPoints/managed-run-worker.ts index 56f81c2aac..b9a085809d 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-worker.ts @@ -98,9 +98,6 @@ process.on("uncaughtException", function (error, origin) { } }); -const usageIntervalMs = getEnvVar("USAGE_HEARTBEAT_INTERVAL_MS"); -const usageEventUrl = getEnvVar("USAGE_EVENT_URL"); -const triggerJWT = getEnvVar("TRIGGER_JWT"); const heartbeatIntervalMs = getEnvVar("HEARTBEAT_INTERVAL_MS"); const standardLocalsManager = new StandardLocalsManager(); @@ -112,17 +109,8 @@ lifecycleHooks.setGlobalLifecycleHooksManager(standardLifecycleHooksManager); const standardRunTimelineMetricsManager = new StandardRunTimelineMetricsManager(); runTimelineMetrics.setGlobalManager(standardRunTimelineMetricsManager); -const devUsageManager = new DevUsageManager(); -const prodUsageManager = new ProdUsageManager(devUsageManager, { - heartbeatIntervalMs: usageIntervalMs ? parseInt(usageIntervalMs, 10) : undefined, - url: usageEventUrl, - jwt: triggerJWT, -}); - -usage.setGlobalUsageManager(prodUsageManager); -timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager)); - resourceCatalog.setGlobalResourceCatalog(new StandardResourceCatalog()); + const durableClock = new DurableClock(); clock.setGlobalClock(durableClock); const runMetadataManager = new StandardMetadataManager( @@ -258,6 +246,12 @@ const zodIpc = new ZodIpcConnection({ }); } + initializeUsageManager({ + usageIntervalMs: getEnvVar("USAGE_HEARTBEAT_INTERVAL_MS"), + usageEventUrl: getEnvVar("USAGE_EVENT_URL"), + triggerJWT: getEnvVar("TRIGGER_JWT"), + }); + standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics); console.log(`[${new Date().toISOString()}] Received EXECUTE_TASK_RUN`, execution); @@ -509,7 +503,7 @@ async function flushAll(timeoutInMs: number = 10_000) { async function flushUsage(timeoutInMs: number = 10_000) { const now = performance.now(); - await Promise.race([prodUsageManager.flush(), setTimeout(timeoutInMs)]); + await Promise.race([usage.flush(), setTimeout(timeoutInMs)]); const duration = performance.now() - now; @@ -551,6 +545,26 @@ async function flushMetadata(timeoutInMs: number = 10_000) { }; } +function initializeUsageManager({ + usageIntervalMs, + usageEventUrl, + triggerJWT, +}: { + usageIntervalMs?: string; + usageEventUrl?: string; + triggerJWT?: string; +}) { + const devUsageManager = new DevUsageManager(); + const prodUsageManager = new ProdUsageManager(devUsageManager, { + heartbeatIntervalMs: usageIntervalMs ? parseInt(usageIntervalMs, 10) : undefined, + url: usageEventUrl, + jwt: triggerJWT, + }); + + usage.setGlobalUsageManager(prodUsageManager); + timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager)); +} + const managedWorkerRuntime = new ManagedRuntimeManager(zodIpc, true); runtime.setGlobalRuntimeManager(managedWorkerRuntime); diff --git a/packages/cli-v3/src/entryPoints/managed/controller.ts b/packages/cli-v3/src/entryPoints/managed/controller.ts index 35fec13932..d6685e8c84 100644 --- a/packages/cli-v3/src/entryPoints/managed/controller.ts +++ b/packages/cli-v3/src/entryPoints/managed/controller.ts @@ -253,6 +253,8 @@ export class ManagedRunController { this.waitForNextRunLock = true; + const previousRunId = this.runFriendlyId; + try { if (!this.warmStartClient) { this.sendDebugLog({ @@ -262,8 +264,6 @@ export class ManagedRunController { this.exitProcess(this.successExitCode); } - const previousRunId = this.runFriendlyId; - if (this.currentExecution?.taskRunEnv) { this.sendDebugLog({ runId: this.runFriendlyId, @@ -307,14 +307,14 @@ export class ManagedRunController { }; this.sendDebugLog({ - runId: this.runFriendlyId, + runId: previousRunId, message: "waitForNextRun: connected to warm start service", properties: warmStartConfig, }); if (!connectionTimeoutMs || !keepaliveMs) { this.sendDebugLog({ - runId: this.runFriendlyId, + runId: previousRunId, message: "waitForNextRun: warm starts disabled after connect", properties: warmStartConfig, }); @@ -329,7 +329,7 @@ export class ManagedRunController { if (!nextRun) { this.sendDebugLog({ - runId: this.runFriendlyId, + runId: previousRunId, message: "waitForNextRun: warm start failed, shutting down", properties: warmStartConfig, }); @@ -339,7 +339,7 @@ export class ManagedRunController { this.warmStartCount++; this.sendDebugLog({ - runId: this.runFriendlyId, + runId: previousRunId, message: "waitForNextRun: got next run", properties: { ...warmStartConfig, @@ -356,7 +356,7 @@ export class ManagedRunController { }).finally(() => {}); } catch (error) { this.sendDebugLog({ - runId: this.runFriendlyId, + runId: previousRunId, message: "waitForNextRun: unexpected error", properties: { error: error instanceof Error ? error.message : String(error) }, }); diff --git a/packages/core/src/v3/usage/api.ts b/packages/core/src/v3/usage/api.ts index f476b23bfa..338cc8cf80 100644 --- a/packages/core/src/v3/usage/api.ts +++ b/packages/core/src/v3/usage/api.ts @@ -44,6 +44,10 @@ export class UsageAPI implements UsageManager { return this.#getUsageManager().sample(); } + public flush(): Promise { + return this.#getUsageManager().flush(); + } + #getUsageManager(): UsageManager { return getGlobal(API_NAME) ?? NOOP_USAGE_MANAGER; } diff --git a/packages/core/src/v3/usage/devUsageManager.ts b/packages/core/src/v3/usage/devUsageManager.ts index d4579813a3..fea5d2fa97 100644 --- a/packages/core/src/v3/usage/devUsageManager.ts +++ b/packages/core/src/v3/usage/devUsageManager.ts @@ -48,6 +48,8 @@ export class DevUsageManager implements UsageManager { disable(): void {} + async flush(): Promise {} + sample(): UsageSample | undefined { return this._firstMeasurement?.sample(); } diff --git a/packages/core/src/v3/usage/noopUsageManager.ts b/packages/core/src/v3/usage/noopUsageManager.ts index 38b42198e2..4369fd3ca7 100644 --- a/packages/core/src/v3/usage/noopUsageManager.ts +++ b/packages/core/src/v3/usage/noopUsageManager.ts @@ -5,6 +5,10 @@ export class NoopUsageManager implements UsageManager { // Noop } + async flush(): Promise { + // Noop + } + start(): UsageMeasurement { return { sample: () => ({ cpuTime: 0, wallTime: 0 }), diff --git a/packages/core/src/v3/usage/types.ts b/packages/core/src/v3/usage/types.ts index 363fe84f66..5b763cc499 100644 --- a/packages/core/src/v3/usage/types.ts +++ b/packages/core/src/v3/usage/types.ts @@ -13,4 +13,5 @@ export interface UsageManager { stop(measurement: UsageMeasurement): UsageSample; sample(): UsageSample | undefined; pauseAsync(cb: () => Promise): Promise; + flush(): Promise; }