diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 8736ad038b..2b4ca5eb5e 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -1,21 +1,41 @@ import { randomUUID } from "crypto"; import { env as stdEnv } from "std-env"; import { z } from "zod"; -import { getDockerHostDomain } from "./util.js"; + +const BoolEnv = z.preprocess((val) => { + if (typeof val !== "string") { + return val; + } + + return ["true", "1"].includes(val.toLowerCase().trim()); +}, z.boolean()); const Env = z.object({ - // This will come from `status.hostIP` in k8s - WORKER_HOST_IP: z.string().default(getDockerHostDomain()), - TRIGGER_API_URL: z.string().url(), - TRIGGER_WORKER_TOKEN: z.string(), // This will come from `spec.nodeName` in k8s TRIGGER_WORKER_INSTANCE_NAME: z.string().default(randomUUID()), + + // Required settings + TRIGGER_API_URL: z.string().url(), + TRIGGER_WORKER_TOKEN: z.string(), MANAGED_WORKER_SECRET: z.string(), - TRIGGER_WORKLOAD_API_PORT: z.coerce.number().default(8020), - TRIGGER_WORKLOAD_API_PORT_EXTERNAL: z.coerce.number().default(8020), + + // Workload API settings (coordinator mode) - the workload API is what the run controller connects to + TRIGGER_WORKLOAD_API_ENABLED: BoolEnv.default("true"), + TRIGGER_WORKLOAD_API_PROTOCOL: z + .string() + .transform((s) => z.enum(["http", "https"]).parse(s.toLowerCase())) + .default("http"), + TRIGGER_WORKLOAD_API_DOMAIN: z.string().optional(), // If unset, will use orchestrator-specific default + TRIGGER_WORKLOAD_API_PORT_INTERNAL: z.coerce.number().default(8020), // This is the port the workload API listens on + TRIGGER_WORKLOAD_API_PORT_EXTERNAL: z.coerce.number().default(8020), // This is the exposed port passed to the run controller + + // Dequeue settings (provider mode) + TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"), + TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(1000), + + // Optional services TRIGGER_WARM_START_URL: z.string().optional(), TRIGGER_CHECKPOINT_URL: z.string().optional(), - TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(1000), // Used by the workload manager, e.g docker/k8s DOCKER_NETWORK: z.string().default("host"), diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index b7f2400aa6..7ea3f03e0a 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -29,7 +29,9 @@ class ManagedSupervisor { private readonly warmStartUrl = env.TRIGGER_WARM_START_URL; constructor() { - const workerApiUrl = `http://${env.WORKER_HOST_IP}:${env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL}`; + const workloadApiProtocol = env.TRIGGER_WORKLOAD_API_PROTOCOL; + const workloadApiDomain = env.TRIGGER_WORKLOAD_API_DOMAIN; + const workloadApiPortExternal = env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL; if (this.warmStartUrl) { this.logger.log("[ManagedWorker] 🔥 Warm starts enabled", { @@ -40,13 +42,17 @@ class ManagedSupervisor { if (this.isKubernetes) { this.resourceMonitor = new KubernetesResourceMonitor(createK8sApi(), ""); this.workloadManager = new KubernetesWorkloadManager({ - workerApiUrl, + workloadApiProtocol, + workloadApiDomain, + workloadApiPort: workloadApiPortExternal, warmStartUrl: this.warmStartUrl, }); } else { this.resourceMonitor = new DockerResourceMonitor(new Docker()); this.workloadManager = new DockerWorkloadManager({ - workerApiUrl, + workloadApiProtocol, + workloadApiDomain, + workloadApiPort: workloadApiPortExternal, warmStartUrl: this.warmStartUrl, }); } @@ -57,6 +63,8 @@ class ManagedSupervisor { instanceName: env.TRIGGER_WORKER_INSTANCE_NAME, managedWorkerSecret: env.MANAGED_WORKER_SECRET, dequeueIntervalMs: env.TRIGGER_DEQUEUE_INTERVAL_MS, + queueConsumerEnabled: env.TRIGGER_DEQUEUE_ENABLED, + runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED, preDequeue: async () => { if (this.isKubernetes) { // TODO: Test k8s resource monitor and remove this @@ -180,7 +188,7 @@ class ManagedSupervisor { // Responds to workload requests only this.workloadServer = new WorkloadServer({ - port: env.TRIGGER_WORKLOAD_API_PORT, + port: env.TRIGGER_WORKLOAD_API_PORT_INTERNAL, workerClient: this.workerSession.httpClient, checkpointClient: this.checkpointClient, }); @@ -238,7 +246,17 @@ class ManagedSupervisor { async start() { this.logger.log("[ManagedWorker] Starting up"); - await this.workloadServer.start(); + if (env.TRIGGER_WORKLOAD_API_ENABLED) { + this.logger.log("[ManagedWorker] Workload API enabled", { + protocol: env.TRIGGER_WORKLOAD_API_PROTOCOL, + domain: env.TRIGGER_WORKLOAD_API_DOMAIN, + port: env.TRIGGER_WORKLOAD_API_PORT_INTERNAL, + }); + await this.workloadServer.start(); + } else { + this.logger.warn("[ManagedWorker] Workload API disabled"); + } + await this.workerSession.start(); await this.httpServer.start(); diff --git a/apps/supervisor/src/workloadManager/docker.ts b/apps/supervisor/src/workloadManager/docker.ts index 48c0b9b7f3..950ad2a343 100644 --- a/apps/supervisor/src/workloadManager/docker.ts +++ b/apps/supervisor/src/workloadManager/docker.ts @@ -6,12 +6,18 @@ import { } from "./types.js"; import { x } from "tinyexec"; import { env } from "../env.js"; -import { RunnerId } from "../util.js"; +import { getDockerHostDomain, RunnerId } from "../util.js"; export class DockerWorkloadManager implements WorkloadManager { private readonly logger = new SimpleStructuredLogger("docker-workload-provider"); - constructor(private opts: WorkloadManagerOptions) {} + constructor(private opts: WorkloadManagerOptions) { + if (opts.workloadApiDomain) { + this.logger.warn("[DockerWorkloadProvider] ⚠️ Custom workload API domain", { + domain: opts.workloadApiDomain, + }); + } + } async create(opts: WorkloadManagerCreateOptions) { this.logger.log("[DockerWorkloadProvider] Creating container", { opts }); @@ -24,7 +30,9 @@ export class DockerWorkloadManager implements WorkloadManager { `--env=TRIGGER_ENV_ID=${opts.envId}`, `--env=TRIGGER_RUN_ID=${opts.runFriendlyId}`, `--env=TRIGGER_SNAPSHOT_ID=${opts.snapshotFriendlyId}`, - `--env=TRIGGER_WORKER_API_URL=${this.opts.workerApiUrl}`, + `--env=TRIGGER_SUPERVISOR_API_PROTOCOL=${this.opts.workloadApiProtocol}`, + `--env=TRIGGER_SUPERVISOR_API_PORT=${this.opts.workloadApiPort}`, + `--env=TRIGGER_SUPERVISOR_API_DOMAIN=${this.opts.workloadApiDomain ?? getDockerHostDomain()}`, `--env=TRIGGER_WORKER_INSTANCE_NAME=${env.TRIGGER_WORKER_INSTANCE_NAME}`, `--env=OTEL_EXPORTER_OTLP_ENDPOINT=${env.OTEL_EXPORTER_OTLP_ENDPOINT}`, `--env=TRIGGER_RUNNER_ID=${runnerId}`, diff --git a/apps/supervisor/src/workloadManager/kubernetes.ts b/apps/supervisor/src/workloadManager/kubernetes.ts index ed8dd6ae8c..b4e4a7e19a 100644 --- a/apps/supervisor/src/workloadManager/kubernetes.ts +++ b/apps/supervisor/src/workloadManager/kubernetes.ts @@ -23,6 +23,12 @@ export class KubernetesWorkloadManager implements WorkloadManager { constructor(private opts: WorkloadManagerOptions) { this.k8s = createK8sApi(); + + if (opts.workloadApiDomain) { + this.logger.warn("[KubernetesWorkloadManager] ⚠️ Custom workload API domain", { + domain: opts.workloadApiDomain, + }); + } } async create(opts: WorkloadManagerCreateOptions) { @@ -72,8 +78,26 @@ export class KubernetesWorkloadManager implements WorkloadManager { value: opts.snapshotFriendlyId, }, { - name: "TRIGGER_WORKER_API_URL", - value: this.opts.workerApiUrl, + name: "TRIGGER_SUPERVISOR_API_PROTOCOL", + value: this.opts.workloadApiProtocol, + }, + { + name: "TRIGGER_SUPERVISOR_API_PORT", + value: `${this.opts.workloadApiPort}`, + }, + { + name: "TRIGGER_SUPERVISOR_API_DOMAIN", + ...(this.opts.workloadApiDomain + ? { + value: this.opts.workloadApiDomain, + } + : { + valueFrom: { + fieldRef: { + fieldPath: "status.hostIP", + }, + }, + }), }, { name: "TRIGGER_WORKER_INSTANCE_NAME", diff --git a/apps/supervisor/src/workloadManager/types.ts b/apps/supervisor/src/workloadManager/types.ts index 57874e5334..ea2046b631 100644 --- a/apps/supervisor/src/workloadManager/types.ts +++ b/apps/supervisor/src/workloadManager/types.ts @@ -1,7 +1,9 @@ import { type EnvironmentType, type MachinePreset } from "@trigger.dev/core/v3"; export interface WorkloadManagerOptions { - workerApiUrl: string; + workloadApiProtocol: "http" | "https"; + workloadApiDomain?: string; // If unset, will use orchestrator-specific default + workloadApiPort: number; warmStartUrl?: string; } diff --git a/packages/cli-v3/src/entryPoints/managed-run-controller.ts b/packages/cli-v3/src/entryPoints/managed-run-controller.ts index cf85dce006..a33e8382ff 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-controller.ts @@ -36,7 +36,9 @@ const Env = z.object({ NODE_EXTRA_CA_CERTS: z.string().optional(), // Set at runtime - TRIGGER_WORKER_API_URL: z.string().url(), + TRIGGER_SUPERVISOR_API_PROTOCOL: z.enum(["http", "https"]), + TRIGGER_SUPERVISOR_API_DOMAIN: z.string(), + TRIGGER_SUPERVISOR_API_PORT: z.coerce.number(), TRIGGER_WORKLOAD_CONTROLLER_ID: z.string().default(`controller_${randomUUID()}`), TRIGGER_ENV_ID: z.string(), TRIGGER_RUN_ID: z.string().optional(), // This is only useful for cold starts @@ -84,6 +86,8 @@ class ManagedRunController { private readonly snapshotPoller: HeartbeatService; private readonly snapshotPollIntervalSeconds: number; + private readonly workerApiUrl: string; + private state: | { phase: "RUN"; @@ -246,8 +250,10 @@ class ManagedRunController { this.heartbeatIntervalSeconds = opts.heartbeatIntervalSeconds || 30; this.snapshotPollIntervalSeconds = 5; + this.workerApiUrl = `${env.TRIGGER_SUPERVISOR_API_PROTOCOL}://${env.TRIGGER_SUPERVISOR_API_DOMAIN}:${env.TRIGGER_SUPERVISOR_API_PORT}`; + this.httpClient = new WorkloadHttpClient({ - workerApiUrl: env.TRIGGER_WORKER_API_URL, + workerApiUrl: this.workerApiUrl, deploymentId: env.TRIGGER_DEPLOYMENT_ID, runnerId: env.TRIGGER_RUNNER_ID, }); @@ -746,8 +752,7 @@ class ManagedRunController { } createSocket() { - const wsUrl = new URL(env.TRIGGER_WORKER_API_URL); - wsUrl.pathname = "/workload"; + const wsUrl = new URL("/workload", this.workerApiUrl); this.socket = io(wsUrl.href, { transports: ["websocket"], diff --git a/packages/core/src/v3/runEngineWorker/supervisor/session.ts b/packages/core/src/v3/runEngineWorker/supervisor/session.ts index fd6c81bebf..4ce3b312b4 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/session.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/session.ts @@ -11,6 +11,8 @@ import { getDefaultWorkerHeaders } from "./util.js"; import { HeartbeatService } from "../../utils/heartbeat.js"; type SupervisorSessionOptions = SupervisorClientCommonOptions & { + queueConsumerEnabled?: boolean; + runNotificationsEnabled?: boolean; heartbeatIntervalSeconds?: number; dequeueIntervalMs?: number; preDequeue?: PreDequeueFn; @@ -20,15 +22,21 @@ type SupervisorSessionOptions = SupervisorClientCommonOptions & { export class SupervisorSession extends EventEmitter { public readonly httpClient: SupervisorHttpClient; - private socket?: Socket; + private readonly runNotificationsEnabled: boolean; + private runNotificationsSocket?: Socket; + private readonly queueConsumerEnabled: boolean; private readonly queueConsumer: RunQueueConsumer; + private readonly heartbeatService: HeartbeatService; private readonly heartbeatIntervalSeconds: number; constructor(private opts: SupervisorSessionOptions) { super(); + this.runNotificationsEnabled = opts.runNotificationsEnabled ?? true; + this.queueConsumerEnabled = opts.queueConsumerEnabled ?? true; + this.httpClient = new SupervisorHttpClient(opts); this.queueConsumer = new RunQueueConsumer({ client: this.httpClient, @@ -76,12 +84,12 @@ export class SupervisorSession extends EventEmitter { subscribeToRunNotifications(runFriendlyIds: string[]) { console.log("[SupervisorSession] Subscribing to run notifications", { runFriendlyIds }); - if (!this.socket) { + if (!this.runNotificationsSocket) { console.error("[SupervisorSession] Socket not connected"); return; } - this.socket.emit("run:subscribe", { version: "1", runFriendlyIds }); + this.runNotificationsSocket.emit("run:subscribe", { version: "1", runFriendlyIds }); Promise.allSettled( runFriendlyIds.map((runFriendlyId) => @@ -96,12 +104,12 @@ export class SupervisorSession extends EventEmitter { unsubscribeFromRunNotifications(runFriendlyIds: string[]) { console.log("[SupervisorSession] Unsubscribing from run notifications", { runFriendlyIds }); - if (!this.socket) { + if (!this.runNotificationsSocket) { console.error("[SupervisorSession] Socket not connected"); return; } - this.socket.emit("run:unsubscribe", { version: "1", runFriendlyIds }); + this.runNotificationsSocket.emit("run:unsubscribe", { version: "1", runFriendlyIds }); Promise.allSettled( runFriendlyIds.map((runFriendlyId) => @@ -116,15 +124,15 @@ export class SupervisorSession extends EventEmitter { ); } - private createSocket() { + private createRunNotificationsSocket() { const wsUrl = new URL(this.opts.apiUrl); wsUrl.pathname = "/worker"; - this.socket = io(wsUrl.href, { + const socket = io(wsUrl.href, { transports: ["websocket"], extraHeaders: getDefaultWorkerHeaders(this.opts), }); - this.socket.on("run:notify", ({ version, run }) => { + socket.on("run:notify", ({ version, run }) => { console.log("[SupervisorSession][WS] Received run notification", { version, run }); this.emit("runNotification", { time: new Date(), run }); @@ -137,15 +145,17 @@ export class SupervisorSession extends EventEmitter { console.error("[SupervisorSession] Failed to send debug log", { error }); }); }); - this.socket.on("connect", () => { + socket.on("connect", () => { console.log("[SupervisorSession][WS] Connected to platform"); }); - this.socket.on("connect_error", (error) => { + socket.on("connect_error", (error) => { console.error("[SupervisorSession][WS] Connection error", { error }); }); - this.socket.on("disconnect", (reason, description) => { + socket.on("disconnect", (reason, description) => { console.log("[SupervisorSession][WS] Disconnected from platform", { reason, description }); }); + + return socket; } async start() { @@ -167,14 +177,25 @@ export class SupervisorSession extends EventEmitter { name: workerGroup.name, }); - this.queueConsumer.start(); - this.heartbeatService.start(); - this.createSocket(); + if (this.queueConsumerEnabled) { + console.log("[SupervisorSession] Queue consumer enabled"); + this.queueConsumer.start(); + this.heartbeatService.start(); + } else { + console.warn("[SupervisorSession] Queue consumer disabled"); + } + + if (this.runNotificationsEnabled) { + console.log("[SupervisorSession] Run notifications enabled"); + this.runNotificationsSocket = this.createRunNotificationsSocket(); + } else { + console.warn("[SupervisorSession] Run notifications disabled"); + } } async stop() { this.heartbeatService.stop(); - this.socket?.disconnect(); + this.runNotificationsSocket?.disconnect(); } private getHeartbeatBody(): WorkerApiHeartbeatRequestBody {