diff --git a/.changeset/proud-yaks-thank.md b/.changeset/proud-yaks-thank.md new file mode 100644 index 0000000000..4df1f028c1 --- /dev/null +++ b/.changeset/proud-yaks-thank.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/core": patch +--- + +Improvements to structured logger and conditional payload logging diff --git a/apps/coordinator/src/checkpointer.ts b/apps/coordinator/src/checkpointer.ts index 2d7876c77f..f1b55fa066 100644 --- a/apps/coordinator/src/checkpointer.ts +++ b/apps/coordinator/src/checkpointer.ts @@ -1,6 +1,5 @@ import { ExponentialBackoff } from "@trigger.dev/core/v3/apps"; import { testDockerCheckpoint } from "@trigger.dev/core/v3/apps"; -import { SimpleLogger } from "@trigger.dev/core/v3/apps"; import { nanoid } from "nanoid"; import fs from "node:fs/promises"; import { ChaosMonkey } from "./chaosMonkey"; @@ -8,6 +7,7 @@ import { Buildah, Crictl, Exec } from "./exec"; import { setTimeout } from "node:timers/promises"; import { TempFileCleaner } from "./cleaner"; import { numFromEnv, boolFromEnv } from "./util"; +import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; type CheckpointerInitializeReturn = { canCheckpoint: boolean; @@ -86,7 +86,7 @@ export class Checkpointer { #canCheckpoint = false; #dockerMode: boolean; - #logger = new SimpleLogger("[checkptr]"); + #logger = new SimpleStructuredLogger("checkpointer"); #abortControllers = new Map(); #failedCheckpoints = new Map(); #waitingForRetry = new Set(); @@ -137,7 +137,7 @@ export class Checkpointer { return this.#getInitReturn(true); } - this.#logger.error(testCheckpoint.message, testCheckpoint.error ?? ""); + this.#logger.error(testCheckpoint.message, { error: testCheckpoint.error }); return this.#getInitReturn(false); } diff --git a/apps/coordinator/src/cleaner.ts b/apps/coordinator/src/cleaner.ts index a3b50e2e91..58cfd24bb7 100644 --- a/apps/coordinator/src/cleaner.ts +++ b/apps/coordinator/src/cleaner.ts @@ -1,4 +1,4 @@ -import { SimpleLogger } from "@trigger.dev/core/v3/apps"; +import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; import { Exec } from "./exec"; import { setTimeout } from "timers/promises"; @@ -10,14 +10,18 @@ interface TempFileCleanerOptions { } export class TempFileCleaner { - private logger = new SimpleLogger("[tmp-cleaner]"); private enabled = false; - private exec = new Exec({ logger: this.logger }); - constructor(private opts: TempFileCleanerOptions) {} + private logger: SimpleStructuredLogger; + private exec: Exec; + + constructor(private opts: TempFileCleanerOptions) { + this.logger = new SimpleStructuredLogger("tmp-cleaner", undefined, { ...this.opts }); + this.exec = new Exec({ logger: this.logger }); + } async start() { - this.logger.log("start", this.opts); + this.logger.log("TempFileCleaner.start"); this.enabled = true; if (!this.opts.leadingEdge) { @@ -28,7 +32,7 @@ export class TempFileCleaner { try { await this.clean(); } catch (error) { - this.logger.error("error during tick", error); + this.logger.error("error during tick", { error }); } await this.wait(); @@ -36,7 +40,7 @@ export class TempFileCleaner { } stop() { - this.logger.log("stop", this.opts); + this.logger.log("TempFileCleaner.stop"); this.enabled = false; } @@ -89,7 +93,7 @@ export class TempFileCleaner { const rmOutput = await rm; if (rmOutput.stderr.length > 0) { - this.logger.error("delete unsuccessful", rmOutput); + this.logger.error("delete unsuccessful", { rmOutput }); return; } diff --git a/apps/coordinator/src/exec.ts b/apps/coordinator/src/exec.ts index c0d8c0c862..d0c7745b0b 100644 --- a/apps/coordinator/src/exec.ts +++ b/apps/coordinator/src/exec.ts @@ -1,4 +1,4 @@ -import { SimpleLogger } from "@trigger.dev/core/v3/apps"; +import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; import { randomUUID } from "crypto"; import { homedir } from "os"; import { type Result, x } from "tinyexec"; @@ -18,7 +18,7 @@ class TinyResult { } interface ExecOptions { - logger?: SimpleLogger; + logger?: SimpleStructuredLogger; abortSignal?: AbortSignal; logOutput?: boolean; trimArgs?: boolean; @@ -26,7 +26,7 @@ interface ExecOptions { } export class Exec { - private logger: SimpleLogger; + private logger: SimpleStructuredLogger; private abortSignal: AbortSignal | undefined; private logOutput: boolean; @@ -34,7 +34,7 @@ export class Exec { private neverThrow: boolean; constructor(opts: ExecOptions) { - this.logger = opts.logger ?? new SimpleLogger(); + this.logger = opts.logger ?? new SimpleStructuredLogger("exec"); this.abortSignal = opts.abortSignal; this.logOutput = opts.logOutput ?? true; @@ -103,7 +103,7 @@ interface BuildahOptions { export class Buildah { private id: string; - private logger: SimpleLogger; + private logger: SimpleStructuredLogger; private exec: Exec; private containers = new Set(); @@ -111,7 +111,7 @@ export class Buildah { constructor(opts: BuildahOptions) { this.id = opts.id ?? randomUUID(); - this.logger = new SimpleLogger(`[buildah][${this.id}]`); + this.logger = new SimpleStructuredLogger("buildah", undefined, { id: this.id }); this.exec = new Exec({ logger: this.logger, @@ -220,14 +220,14 @@ interface CrictlOptions { export class Crictl { private id: string; - private logger: SimpleLogger; + private logger: SimpleStructuredLogger; private exec: Exec; private archives = new Set(); constructor(opts: CrictlOptions) { this.id = opts.id ?? randomUUID(); - this.logger = new SimpleLogger(`[crictl][${this.id}]`); + this.logger = new SimpleStructuredLogger("crictl", undefined, { id: this.id }); this.exec = new Exec({ logger: this.logger, diff --git a/apps/coordinator/src/index.ts b/apps/coordinator/src/index.ts index 238dd45db7..d32731f4fa 100644 --- a/apps/coordinator/src/index.ts +++ b/apps/coordinator/src/index.ts @@ -3,6 +3,7 @@ import { Server } from "socket.io"; import { CoordinatorToPlatformMessages, CoordinatorToProdWorkerMessages, + omit, PlatformToCoordinatorMessages, ProdWorkerSocketData, ProdWorkerToCoordinatorMessages, @@ -11,12 +12,12 @@ import { import { ZodNamespace } from "@trigger.dev/core/v3/zodNamespace"; import { ZodSocketConnection } from "@trigger.dev/core/v3/zodSocket"; import { HttpReply, getTextBody } from "@trigger.dev/core/v3/apps"; -import { SimpleLogger } from "@trigger.dev/core/v3/apps"; import { ChaosMonkey } from "./chaosMonkey"; import { Checkpointer } from "./checkpointer"; import { boolFromEnv, numFromEnv } from "./util"; import { collectDefaultMetrics, register, Gauge } from "prom-client"; +import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; collectDefaultMetrics(); const HTTP_SERVER_PORT = Number(process.env.HTTP_SERVER_PORT || 8020); @@ -29,7 +30,7 @@ const PLATFORM_WS_PORT = process.env.PLATFORM_WS_PORT || 3030; const PLATFORM_SECRET = process.env.PLATFORM_SECRET || "coordinator-secret"; const SECURE_CONNECTION = ["1", "true"].includes(process.env.SECURE_CONNECTION ?? "false"); -const logger = new SimpleLogger(`[${NODE_NAME}]`); +const logger = new SimpleStructuredLogger("coordinator", undefined, { nodeName: NODE_NAME }); const chaosMonkey = new ChaosMonkey( !!process.env.CHAOS_MONKEY_ENABLED, !!process.env.CHAOS_MONKEY_DISABLE_ERRORS, @@ -104,7 +105,7 @@ class TaskCoordinator { // MARK: SOCKET: PLATFORM #createPlatformSocket() { if (!PLATFORM_ENABLED) { - console.log("INFO: platform connection disabled"); + logger.log("INFO: platform connection disabled"); return; } @@ -115,8 +116,12 @@ class TaskCoordinator { const host = PLATFORM_HOST; const port = Number(PLATFORM_WS_PORT); - logger.log(`connecting to platform: ${host}:${port}`); - logger.debug(`connecting with extra headers`, { extraHeaders }); + const platformLogger = new SimpleStructuredLogger("socket-platform", undefined, { + namespace: "coordinator", + }); + + platformLogger.log("connecting", { host, port }); + platformLogger.debug("connecting with extra headers", { extraHeaders }); const platformConnection = new ZodSocketConnection({ namespace: "coordinator", @@ -127,17 +132,31 @@ class TaskCoordinator { clientMessages: CoordinatorToPlatformMessages, serverMessages: PlatformToCoordinatorMessages, authToken: PLATFORM_SECRET, + logHandlerPayloads: false, handlers: { RESUME_AFTER_DEPENDENCY: async (message) => { + const log = platformLogger.child({ + eventName: "RESUME_AFTER_DEPENDENCY", + ...omit(message, "completions", "executions"), + completions: message.completions.map((c) => ({ + id: c.id, + ok: c.ok, + })), + executions: message.executions.length, + }); + + log.log("Handling RESUME_AFTER_DEPENDENCY"); + const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId); if (!taskSocket) { - logger.log("Socket for attempt not found", { - attemptFriendlyId: message.attemptFriendlyId, - }); + log.debug("Socket for attempt not found"); return; } + log.addFields({ socketId: taskSocket.id, socketData: taskSocket.data }); + log.log("Found task socket for RESUME_AFTER_DEPENDENCY"); + await chaosMonkey.call(); // In case the task resumed faster than we could checkpoint @@ -146,12 +165,22 @@ class TaskCoordinator { taskSocket.emit("RESUME_AFTER_DEPENDENCY", message); }, RESUME_AFTER_DEPENDENCY_WITH_ACK: async (message) => { + const log = platformLogger.child({ + eventName: "RESUME_AFTER_DEPENDENCY_WITH_ACK", + ...omit(message, "completions", "executions"), + completions: message.completions.map((c) => ({ + id: c.id, + ok: c.ok, + })), + executions: message.executions.length, + }); + + log.log("Handling RESUME_AFTER_DEPENDENCY_WITH_ACK"); + const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId); if (!taskSocket) { - logger.log("Socket for attempt not found", { - attemptFriendlyId: message.attemptFriendlyId, - }); + log.debug("Socket for attempt not found"); return { success: false, error: { @@ -161,11 +190,12 @@ class TaskCoordinator { }; } + log.addFields({ socketId: taskSocket.id, socketData: taskSocket.data }); + log.log("Found task socket for RESUME_AFTER_DEPENDENCY_WITH_ACK"); + //if this is set, we want to kill the process because it will be resumed with the checkpoint from the queue if (taskSocket.data.requiresCheckpointResumeWithMessage) { - logger.log("RESUME_AFTER_DEPENDENCY_WITH_ACK: Checkpoint is set so going to nack", { - socketData: taskSocket.data, - }); + log.log("RESUME_AFTER_DEPENDENCY_WITH_ACK: Checkpoint is set so going to nack"); return { success: false, @@ -189,41 +219,65 @@ class TaskCoordinator { }; }, RESUME_AFTER_DURATION: async (message) => { + const log = platformLogger.child({ + eventName: "RESUME_AFTER_DURATION", + ...message, + }); + + log.log("Handling RESUME_AFTER_DURATION"); + const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId); if (!taskSocket) { - logger.log("Socket for attempt not found", { - attemptFriendlyId: message.attemptFriendlyId, - }); + log.debug("Socket for attempt not found"); return; } + log.addFields({ socketId: taskSocket.id, socketData: taskSocket.data }); + log.log("Found task socket for RESUME_AFTER_DURATION"); + await chaosMonkey.call(); taskSocket.emit("RESUME_AFTER_DURATION", message); }, REQUEST_ATTEMPT_CANCELLATION: async (message) => { + const log = platformLogger.child({ + eventName: "REQUEST_ATTEMPT_CANCELLATION", + ...message, + }); + + log.log("Handling REQUEST_ATTEMPT_CANCELLATION"); + const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId); if (!taskSocket) { - logger.log("Socket for attempt not found", { - attemptFriendlyId: message.attemptFriendlyId, - }); + logger.debug("Socket for attempt not found"); return; } + log.addFields({ socketId: taskSocket.id, socketData: taskSocket.data }); + log.log("Found task socket for REQUEST_ATTEMPT_CANCELLATION"); + taskSocket.emit("REQUEST_ATTEMPT_CANCELLATION", message); }, REQUEST_RUN_CANCELLATION: async (message) => { + const log = platformLogger.child({ + eventName: "REQUEST_RUN_CANCELLATION", + ...message, + }); + + log.log("Handling REQUEST_RUN_CANCELLATION"); + const taskSocket = await this.#getRunSocket(message.runId); if (!taskSocket) { - logger.log("Socket for run not found", { - runId: message.runId, - }); + logger.debug("Socket for run not found"); return; } + log.addFields({ socketId: taskSocket.id, socketData: taskSocket.data }); + log.log("Found task socket for REQUEST_RUN_CANCELLATION"); + this.#cancelCheckpoint(message.runId); if (message.delayInMs) { @@ -239,20 +293,33 @@ class TaskCoordinator { } }, READY_FOR_RETRY: async (message) => { + const log = platformLogger.child({ + eventName: "READY_FOR_RETRY", + ...message, + }); + const taskSocket = await this.#getRunSocket(message.runId); if (!taskSocket) { - logger.log("Socket for attempt not found", { - runId: message.runId, - }); + logger.debug("Socket for attempt not found"); return; } + log.addFields({ socketId: taskSocket.id, socketData: taskSocket.data }); + log.log("Found task socket for READY_FOR_RETRY"); + await chaosMonkey.call(); taskSocket.emit("READY_FOR_RETRY", message); }, DYNAMIC_CONFIG: async (message) => { + const log = platformLogger.child({ + eventName: "DYNAMIC_CONFIG", + ...message, + }); + + log.log("Handling DYNAMIC_CONFIG"); + this.#delayThresholdInMs = message.checkpointThresholdInMs; // The first time we receive a dynamic config, the worker namespace will be created @@ -335,13 +402,35 @@ class TaskCoordinator { next(); }, onConnection: async (socket, handler, sender) => { - const logger = new SimpleLogger(`[prod-worker][${socket.id}]`); + const logger = new SimpleStructuredLogger("ns-prod-worker", undefined, { + namespace: "prod-worker", + socketId: socket.id, + socketData: socket.data, + }); + + const getSocketMetadata = () => { + return { + attemptFriendlyId: socket.data.attemptFriendlyId, + attemptNumber: socket.data.attemptNumber, + requiresCheckpointResumeWithMessage: socket.data.requiresCheckpointResumeWithMessage, + }; + }; const getAttemptNumber = () => { return socket.data.attemptNumber ? parseInt(socket.data.attemptNumber) : undefined; }; + const exitRun = () => { + logger.log("exitRun", getSocketMetadata()); + + socket.emit("REQUEST_EXIT", { + version: "v1", + }); + }; + const crashRun = async (error: { name: string; message: string; stack?: string }) => { + logger.error("crashRun", { ...getSocketMetadata(), error }); + try { this.#platformSocket?.send("RUN_CRASHED", { version: "v1", @@ -349,9 +438,7 @@ class TaskCoordinator { error, }); } finally { - socket.emit("REQUEST_EXIT", { - version: "v1", - }); + exitRun(); } }; @@ -370,7 +457,9 @@ class TaskCoordinator { reason?: string; } > => { - logger.log("readyToCheckpoint", { runId: socket.data.runId, reason }); + const log = logger.child(getSocketMetadata()); + + log.log("readyToCheckpoint", { runId: socket.data.runId, reason }); if (checkpointInProgress()) { return { @@ -401,10 +490,7 @@ class TaskCoordinator { success: true, }; } catch (error) { - logger.error("Error while waiting for checkpointable state", { - error, - runId: socket.data.runId, - }); + log.error("Error while waiting for checkpointable state", { error }); if (error instanceof CheckpointReadinessTimeoutError) { logger.error( @@ -448,14 +534,20 @@ class TaskCoordinator { }); socket.on("TEST", (message, callback) => { - logger.log("[TEST]", { runId: socket.data.runId, message }); + logger.log("Handling TEST", { eventName: "TEST", ...getSocketMetadata(), ...message }); callback(); }); // Deprecated: Only workers without support for lazy attempts use this socket.on("READY_FOR_EXECUTION", async (message) => { - logger.log("[READY_FOR_EXECUTION]", message); + const log = logger.child({ + eventName: "READY_FOR_EXECUTION", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling READY_FOR_EXECUTION"); try { const executionAck = await this.#platformSocket?.sendWithAck( @@ -464,7 +556,7 @@ class TaskCoordinator { ); if (!executionAck) { - logger.error("no execution ack", { runId: socket.data.runId }); + log.error("no execution ack"); await crashRun({ name: "ReadyForExecutionError", @@ -475,7 +567,7 @@ class TaskCoordinator { } if (!executionAck.success) { - logger.error("failed to get execution payload", { runId: socket.data.runId }); + log.error("failed to get execution payload"); await crashRun({ name: "ReadyForExecutionError", @@ -493,7 +585,7 @@ class TaskCoordinator { updateAttemptFriendlyId(executionAck.payload.execution.attempt.id); updateAttemptNumber(executionAck.payload.execution.attempt.number); } catch (error) { - logger.error("READY_FOR_EXECUTION error", { error, runId: socket.data.runId }); + log.error("READY_FOR_EXECUTION error", { error }); await crashRun({ name: "ReadyForExecutionError", @@ -507,7 +599,13 @@ class TaskCoordinator { // MARK: LAZY ATTEMPT socket.on("READY_FOR_LAZY_ATTEMPT", async (message) => { - logger.log("[READY_FOR_LAZY_ATTEMPT]", message); + const log = logger.child({ + eventName: "READY_FOR_LAZY_ATTEMPT", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling READY_FOR_LAZY_ATTEMPT"); try { const lazyAttempt = await this.#platformSocket?.sendWithAck("READY_FOR_LAZY_ATTEMPT", { @@ -516,7 +614,7 @@ class TaskCoordinator { }); if (!lazyAttempt) { - logger.error("no lazy attempt ack", { runId: socket.data.runId }); + log.error("no lazy attempt ack"); await crashRun({ name: "ReadyForLazyAttemptError", @@ -527,10 +625,7 @@ class TaskCoordinator { } if (!lazyAttempt.success) { - logger.error("failed to get lazy attempt payload", { - runId: socket.data.runId, - reason: lazyAttempt.reason, - }); + log.error("failed to get lazy attempt payload", { reason: lazyAttempt.reason }); await crashRun({ name: "ReadyForLazyAttemptError", @@ -548,11 +643,11 @@ class TaskCoordinator { }); } catch (error) { if (error instanceof ChaosMonkey.Error) { - logger.error("ChaosMonkey error, won't crash run", { runId: socket.data.runId }); + log.error("ChaosMonkey error, won't crash run"); return; } - logger.error("READY_FOR_LAZY_ATTEMPT error", { error, runId: socket.data.runId }); + log.error("READY_FOR_LAZY_ATTEMPT error", { error }); await crashRun({ name: "ReadyForLazyAttemptError", @@ -566,7 +661,13 @@ class TaskCoordinator { // MARK: RESUME READY socket.on("READY_FOR_RESUME", async (message) => { - logger.log("[READY_FOR_RESUME]", message); + const log = logger.child({ + eventName: "READY_FOR_RESUME", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling READY_FOR_RESUME"); updateAttemptFriendlyId(message.attemptFriendlyId); @@ -579,9 +680,19 @@ class TaskCoordinator { // MARK: RUN COMPLETED socket.on("TASK_RUN_COMPLETED", async (message, callback) => { - const { completion, execution } = message; + const log = logger.child({ + eventName: "TASK_RUN_COMPLETED", + ...getSocketMetadata(), + ...omit(message, "completion", "execution"), + completion: { + id: message.completion.id, + ok: message.completion.ok, + }, + }); - logger.log("completed task", { completionId: completion.id }); + log.log("Handling TASK_RUN_COMPLETED"); + + const { completion, execution } = message; // Cancel all in-progress checkpoints (if any) this.#cancelCheckpoint(socket.data.runId); @@ -646,10 +757,7 @@ class TaskCoordinator { const ready = await readyToCheckpoint("RETRY"); if (!ready.success) { - logger.error("Failed to become checkpointable", { - runId: socket.data.runId, - reason: ready.reason, - }); + log.error("Failed to become checkpointable", { reason: ready.reason }); return; } @@ -662,11 +770,13 @@ class TaskCoordinator { }); if (!checkpoint) { - logger.error("Failed to checkpoint", { runId: socket.data.runId }); + log.error("Failed to checkpoint"); completeWithoutCheckpoint(false); return; } + log.addFields({ checkpoint }); + this.#platformSocket?.send("TASK_RUN_COMPLETED", { version: "v1", execution, @@ -675,15 +785,22 @@ class TaskCoordinator { }); if (!checkpoint.docker || !willSimulate) { - socket.emit("REQUEST_EXIT", { - version: "v1", - }); + exitRun(); } }); // MARK: TASK FAILED socket.on("TASK_RUN_FAILED_TO_RUN", async ({ completion }) => { - logger.log("task failed to run", { completionId: completion.id }); + const log = logger.child({ + eventName: "TASK_RUN_FAILED_TO_RUN", + ...getSocketMetadata(), + completion: { + id: completion.id, + ok: completion.ok, + }, + }); + + log.log("Handling TASK_RUN_FAILED_TO_RUN"); // Cancel all in-progress checkpoints (if any) this.#cancelCheckpoint(socket.data.runId); @@ -693,19 +810,23 @@ class TaskCoordinator { completion, }); - socket.emit("REQUEST_EXIT", { - version: "v1", - }); + exitRun(); }); // MARK: CHECKPOINT socket.on("READY_FOR_CHECKPOINT", async (message) => { - logger.log("[READY_FOR_CHECKPOINT]", message); + const log = logger.child({ + eventName: "READY_FOR_CHECKPOINT", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling READY_FOR_CHECKPOINT"); const checkpointable = this.#checkpointableTasks.get(socket.data.runId); if (!checkpointable) { - logger.error("No checkpoint scheduled", { runId: socket.data.runId }); + log.error("No checkpoint scheduled"); return; } @@ -714,7 +835,13 @@ class TaskCoordinator { // MARK: CXX CHECKPOINT socket.on("CANCEL_CHECKPOINT", async (message, callback) => { - logger.log("[CANCEL_CHECKPOINT]", message); + const log = logger.child({ + eventName: "CANCEL_CHECKPOINT", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling CANCEL_CHECKPOINT"); if (message.version === "v1") { this.#cancelCheckpoint(socket.data.runId); @@ -729,12 +856,18 @@ class TaskCoordinator { // MARK: DURATION WAIT socket.on("WAIT_FOR_DURATION", async (message, callback) => { - logger.log("[WAIT_FOR_DURATION]", message); + const log = logger.child({ + eventName: "WAIT_FOR_DURATION", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling WAIT_FOR_DURATION"); await chaosMonkey.call({ throwErrors: false }); if (checkpointInProgress()) { - logger.error("Checkpoint already in progress", { runId: socket.data.runId }); + log.error("Checkpoint already in progress"); callback({ willCheckpointAndRestore: false }); return; } @@ -752,10 +885,7 @@ class TaskCoordinator { const ready = await readyToCheckpoint("WAIT_FOR_DURATION"); if (!ready.success) { - logger.error("Failed to become checkpointable", { - runId: socket.data.runId, - reason: ready.reason, - }); + log.error("Failed to become checkpointable", { reason: ready.reason }); return; } @@ -768,10 +898,12 @@ class TaskCoordinator { if (!checkpoint) { // The task container will keep running until the wait duration has elapsed - logger.error("Failed to checkpoint", { runId: socket.data.runId }); + log.error("Failed to checkpoint"); return; } + log.addFields({ checkpoint }); + const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { version: "v1", attemptFriendlyId: message.attemptFriendlyId, @@ -785,25 +917,29 @@ class TaskCoordinator { }); if (ack?.keepRunAlive) { - logger.log("keeping run alive after duration checkpoint", { runId: socket.data.runId }); + log.log("keeping run alive after duration checkpoint"); return; } if (!checkpoint.docker || !willSimulate) { - socket.emit("REQUEST_EXIT", { - version: "v1", - }); + exitRun(); } }); // MARK: TASK WAIT socket.on("WAIT_FOR_TASK", async (message, callback) => { - logger.log("[WAIT_FOR_TASK]", message); + const log = logger.child({ + eventName: "WAIT_FOR_TASK", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling WAIT_FOR_TASK"); await chaosMonkey.call({ throwErrors: false }); if (checkpointInProgress()) { - logger.error("Checkpoint already in progress", { runId: socket.data.runId }); + log.error("Checkpoint already in progress"); callback({ willCheckpointAndRestore: false }); return; } @@ -823,10 +959,7 @@ class TaskCoordinator { const ready = await readyToCheckpoint("WAIT_FOR_TASK"); if (!ready.success) { - logger.error("Failed to become checkpointable", { - runId: socket.data.runId, - reason: ready.reason, - }); + log.error("Failed to become checkpointable", { reason: ready.reason }); return; } } @@ -839,21 +972,17 @@ class TaskCoordinator { }); if (!checkpoint) { - logger.error("Failed to checkpoint", { runId: socket.data.runId }); + log.error("Failed to checkpoint"); return; } - logger.log("WAIT_FOR_TASK checkpoint created", { - checkpoint, - socketData: socket.data, - }); + log.addFields({ checkpoint }); + + log.log("WAIT_FOR_TASK checkpoint created"); //setting this means we can only resume from a checkpoint socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`; - logger.log("WAIT_FOR_TASK set requiresCheckpointResumeWithMessage", { - checkpoint, - socketData: socket.data, - }); + log.log("WAIT_FOR_TASK set requiresCheckpointResumeWithMessage"); const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { version: "v1", @@ -868,25 +997,29 @@ class TaskCoordinator { if (ack?.keepRunAlive) { socket.data.requiresCheckpointResumeWithMessage = undefined; - logger.log("keeping run alive after task checkpoint", { runId: socket.data.runId }); + log.log("keeping run alive after task checkpoint"); return; } if (!checkpoint.docker || !willSimulate) { - socket.emit("REQUEST_EXIT", { - version: "v1", - }); + exitRun(); } }); // MARK: BATCH WAIT socket.on("WAIT_FOR_BATCH", async (message, callback) => { - logger.log("[WAIT_FOR_BATCH]", message); + const log = logger.child({ + eventName: "WAIT_FOR_BATCH", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling WAIT_FOR_BATCH", message); await chaosMonkey.call({ throwErrors: false }); if (checkpointInProgress()) { - logger.error("Checkpoint already in progress", { runId: socket.data.runId }); + log.error("Checkpoint already in progress"); callback({ willCheckpointAndRestore: false }); return; } @@ -906,10 +1039,7 @@ class TaskCoordinator { const ready = await readyToCheckpoint("WAIT_FOR_BATCH"); if (!ready.success) { - logger.error("Failed to become checkpointable", { - runId: socket.data.runId, - reason: ready.reason, - }); + log.error("Failed to become checkpointable", { reason: ready.reason }); return; } } @@ -922,21 +1052,17 @@ class TaskCoordinator { }); if (!checkpoint) { - logger.error("Failed to checkpoint", { runId: socket.data.runId }); + log.error("Failed to checkpoint"); return; } - logger.log("WAIT_FOR_BATCH checkpoint created", { - checkpoint, - socketData: socket.data, - }); + log.addFields({ checkpoint }); + + log.log("WAIT_FOR_BATCH checkpoint created"); //setting this means we can only resume from a checkpoint socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`; - logger.log("WAIT_FOR_BATCH set checkpoint", { - checkpoint, - socketData: socket.data, - }); + log.log("WAIT_FOR_BATCH set checkpoint"); const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { version: "v1", @@ -952,20 +1078,24 @@ class TaskCoordinator { if (ack?.keepRunAlive) { socket.data.requiresCheckpointResumeWithMessage = undefined; - logger.log("keeping run alive after batch checkpoint", { runId: socket.data.runId }); + log.log("keeping run alive after batch checkpoint"); return; } if (!checkpoint.docker || !willSimulate) { - socket.emit("REQUEST_EXIT", { - version: "v1", - }); + exitRun(); } }); // MARK: INDEX socket.on("INDEX_TASKS", async (message, callback) => { - logger.log("[INDEX_TASKS]", message); + const log = logger.child({ + eventName: "INDEX_TASKS", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling INDEX_TASKS"); const workerAck = await this.#platformSocket?.sendWithAck("CREATE_WORKER", { version: "v2", @@ -981,7 +1111,7 @@ class TaskCoordinator { }); if (!workerAck) { - logger.debug("no worker ack while indexing", message); + log.debug("no worker ack while indexing"); } callback({ success: !!workerAck?.success }); @@ -989,7 +1119,13 @@ class TaskCoordinator { // MARK: INDEX FAILED socket.on("INDEXING_FAILED", async (message) => { - logger.log("[INDEXING_FAILED]", message); + const log = logger.child({ + eventName: "INDEXING_FAILED", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling INDEXING_FAILED"); this.#platformSocket?.send("INDEXING_FAILED", { version: "v1", @@ -1000,7 +1136,13 @@ class TaskCoordinator { // MARK: CREATE ATTEMPT socket.on("CREATE_TASK_RUN_ATTEMPT", async (message, callback) => { - logger.log("[CREATE_TASK_RUN_ATTEMPT]", message); + const log = logger.child({ + eventName: "CREATE_TASK_RUN_ATTEMPT", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling CREATE_TASK_RUN_ATTEMPT"); await chaosMonkey.call({ throwErrors: false }); @@ -1010,10 +1152,7 @@ class TaskCoordinator { }); if (!createAttempt?.success) { - logger.debug("no ack while creating attempt", { - runId: message.runId, - reason: createAttempt?.reason, - }); + log.debug("no ack while creating attempt", { reason: createAttempt?.reason }); callback({ success: false, reason: createAttempt?.reason }); return; } @@ -1028,13 +1167,25 @@ class TaskCoordinator { }); socket.on("UNRECOVERABLE_ERROR", async (message) => { - logger.log("[UNRECOVERABLE_ERROR]", message); + const log = logger.child({ + eventName: "UNRECOVERABLE_ERROR", + ...getSocketMetadata(), + error: message.error, + }); + + log.log("Handling UNRECOVERABLE_ERROR"); await crashRun(message.error); }); socket.on("SET_STATE", async (message) => { - logger.log("[SET_STATE]", message); + const log = logger.child({ + eventName: "SET_STATE", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling SET_STATE"); if (message.attemptFriendlyId) { updateAttemptFriendlyId(message.attemptFriendlyId); @@ -1090,7 +1241,7 @@ class TaskCoordinator { // MARK: HTTP SERVER #createHttpServer() { const httpServer = createServer(async (req, res) => { - logger.log(`[${req.method}]`, req.url); + logger.log(`[${req.method}]`, { url: req.url }); const reply = new HttpReply(res); @@ -1120,7 +1271,7 @@ class TaskCoordinator { }); httpServer.on("listening", () => { - logger.log("server listening on port", HTTP_SERVER_PORT); + logger.log("server listening on port", { port: HTTP_SERVER_PORT }); }); return httpServer; diff --git a/apps/docker-provider/src/index.ts b/apps/docker-provider/src/index.ts index cfdaeb147c..9a4ea0160c 100644 --- a/apps/docker-provider/src/index.ts +++ b/apps/docker-provider/src/index.ts @@ -20,7 +20,7 @@ const OTEL_EXPORTER_OTLP_ENDPOINT = process.env.OTEL_EXPORTER_OTLP_ENDPOINT || "http://0.0.0.0:4318"; const FORCE_CHECKPOINT_SIMULATION = ["1", "true"].includes( - process.env.FORCE_CHECKPOINT_SIMULATION ?? "true" + process.env.FORCE_CHECKPOINT_SIMULATION ?? "false" ); const logger = new SimpleLogger(`[${MACHINE_NAME}]`); diff --git a/packages/core/src/v3/utils/structuredLogger.ts b/packages/core/src/v3/utils/structuredLogger.ts index cc38cdb7e7..72c675aecd 100644 --- a/packages/core/src/v3/utils/structuredLogger.ts +++ b/packages/core/src/v3/utils/structuredLogger.ts @@ -60,6 +60,13 @@ export class SimpleStructuredLogger implements StructuredLogger { this.#structuredLog(console.debug, message, "debug", ...args); } + addFields(fields: Record) { + this.fields = { + ...this.fields, + ...fields, + }; + } + #structuredLog( loggerFunction: (message: string, ...args: any[]) => void, message: string, @@ -67,12 +74,12 @@ export class SimpleStructuredLogger implements StructuredLogger { ...args: StructuredArgs ) { const structuredLog = { - ...(args.length === 1 ? args[0] : args), - ...this.fields, timestamp: new Date(), - name: this.name, message, - level, + $name: this.name, + $level: level, + ...this.fields, + ...(args.length === 1 ? args[0] : args), }; loggerFunction(JSON.stringify(structuredLog)); diff --git a/packages/core/src/v3/zodNamespace.ts b/packages/core/src/v3/zodNamespace.ts index 5fd2b01e5a..38316d6618 100644 --- a/packages/core/src/v3/zodNamespace.ts +++ b/packages/core/src/v3/zodNamespace.ts @@ -41,6 +41,7 @@ interface ZodNamespaceOptions< handlers?: ZodSocketMessageHandlers; authToken?: string; logger?: StructuredLogger; + logHandlerPayloads?: boolean; preAuth?: ( socket: ZodNamespaceSocket, next: (err?: ExtendedError) => void, @@ -91,11 +92,16 @@ export class ZodNamespace< constructor( opts: ZodNamespaceOptions ) { - this.#logger = opts.logger ?? new SimpleStructuredLogger(opts.name); + this.#logger = + opts.logger ?? + new SimpleStructuredLogger(`ns-${opts.name}`, undefined, { + namespace: opts.name, + }); this.#handler = new ZodSocketMessageHandler({ schema: opts.clientMessages, handlers: opts.handlers, + logPayloads: opts.logHandlerPayloads, }); this.io = opts.io; diff --git a/packages/core/src/v3/zodSocket.ts b/packages/core/src/v3/zodSocket.ts index 5534e04eb6..66d45ef120 100644 --- a/packages/core/src/v3/zodSocket.ts +++ b/packages/core/src/v3/zodSocket.ts @@ -68,6 +68,7 @@ export type ZodSocketMessageHandlerOptions; logger?: StructuredLogger; + logPayloads?: boolean; }; type MessageFromSocketSchema< @@ -92,12 +93,14 @@ export class ZodSocketMessageHandler | undefined; #logger: StructuredLogger; + #logPayloads: boolean; constructor(options: ZodSocketMessageHandlerOptions) { this.#schema = options.schema; this.#handlers = options.handlers; this.#logger = options.logger ?? new SimpleStructuredLogger("socket-message-handler", LogLevel.info); + this.#logPayloads = options.logPayloads ?? !!process.env.LOG_SOCKET_HANDLER_PAYLOADS ?? false; } public async handleMessage(message: unknown) { @@ -120,7 +123,7 @@ export class ZodSocketMessageHandler => { - log.info(`handling ${eventName}`, { - payload: message, + log.info(`Incoming event ${eventName}`, { + eventName, + ...(this.#logPayloads ? { eventMessage: message } : {}), hasCallback: !!callback, }); @@ -344,6 +348,7 @@ interface ZodSocketConnectionOptions< handlers?: ZodSocketMessageHandlers; authToken?: string; ioOptions?: Partial; + logHandlerPayloads?: boolean; onConnection?: ( socket: ZodSocket, handler: ZodSocketMessageHandler, @@ -378,7 +383,14 @@ export class ZodSocketConnection< opts.port ?? (opts.secure ? "443" : "80") }/${opts.namespace}`; - const logger = new SimpleStructuredLogger(opts.namespace, LogLevel.info); + const logger = new SimpleStructuredLogger(`socket-${opts.namespace}`, LogLevel.info, { + namespace: opts.namespace, + host: opts.host, + port: opts.port, + secure: opts.secure, + extraHeaders: opts.extraHeaders, + }); + logger.log("new zod socket", { uri }); this.socket = io(uri, { @@ -399,6 +411,7 @@ export class ZodSocketConnection< this.#handler = new ZodSocketMessageHandler({ schema: opts.serverMessages, handlers: opts.handlers, + logPayloads: opts.logHandlerPayloads, }); this.#handler.registerHandlers(this.socket, this.#logger);