From e03025ce59992673c38f5149842ed29dbca5589c Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 27 Jan 2025 17:05:47 +0000 Subject: [PATCH 1/2] Various fixes for run engine v1 - Make sure there are connected providers before sending a scheduled attempt message, nack and retry if there are not - Fail runs that fail task heartbeats when pending and locked - More and better logging around shared queue consumer - Fix bug when failing a task run with no attempt --- apps/webapp/app/v3/failedTaskRun.server.ts | 25 +++--- .../v3/marqs/sharedQueueConsumer.server.ts | 89 +++++++++++++------ apps/webapp/app/v3/requeueTaskRun.server.ts | 25 +++++- .../services/createTaskRunAttempt.server.ts | 7 +- apps/webapp/app/v3/sharedSocketConnection.ts | 8 ++ packages/core/src/logger.ts | 43 +++++++++ packages/core/src/v3/zodMessageHandler.ts | 10 +++ 7 files changed, 162 insertions(+), 45 deletions(-) diff --git a/apps/webapp/app/v3/failedTaskRun.server.ts b/apps/webapp/app/v3/failedTaskRun.server.ts index 9c0167fc57..f26a343f0a 100644 --- a/apps/webapp/app/v3/failedTaskRun.server.ts +++ b/apps/webapp/app/v3/failedTaskRun.server.ts @@ -14,20 +14,21 @@ import { CreateTaskRunAttemptService } from "./services/createTaskRunAttempt.ser import { sharedQueueTasks } from "./marqs/sharedQueueConsumer.server"; import * as semver from "semver"; -const includeAttempts = { - attempts: { - orderBy: { - createdAt: "desc", +const FailedTaskRunRetryGetPayload = { + select: { + id: true, + attempts: { + orderBy: { + createdAt: "desc", + }, + take: 1, }, - take: 1, + lockedById: true, // task + lockedToVersionId: true, // worker }, - lockedBy: true, // task - lockedToVersion: true, // worker -} satisfies Prisma.TaskRunInclude; +} as const; -type TaskRunWithAttempts = Prisma.TaskRunGetPayload<{ - include: typeof includeAttempts; -}>; +type TaskRunWithAttempts = Prisma.TaskRunGetPayload; export class FailedTaskRunService extends BaseService { public async call(anyRunId: string, completion: TaskRunFailedExecutionResult) { @@ -92,7 +93,7 @@ export class FailedTaskRunRetryHelper extends BaseService { where: { id: runId, }, - include: includeAttempts, + ...FailedTaskRunRetryGetPayload, }); if (!taskRun) { diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index d2dc99a249..3d77595194 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -13,7 +13,6 @@ import { MachinePreset, ProdTaskRunExecution, ProdTaskRunExecutionPayload, - QueueOptions, TaskRunError, TaskRunErrorCodes, TaskRunExecution, @@ -29,13 +28,13 @@ import { BackgroundWorker, BackgroundWorkerTask, Prisma, - TaskQueue, TaskRunStatus, } from "@trigger.dev/database"; import { z } from "zod"; import { $replica, prisma } from "~/db.server"; import { env } from "~/env.server"; import { findEnvironmentById } from "~/models/runtimeEnvironment.server"; +import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server"; import { generateJWTTokenForEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; @@ -67,7 +66,6 @@ import { import { tracer } from "../tracer.server"; import { getMaxDuration } from "../utils/maxDuration"; import { MessagePayload } from "./types"; -import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server"; const WithTraceContext = z.object({ traceparent: z.string().optional(), @@ -323,6 +321,14 @@ export class SharedQueueConsumer { ROOT_CONTEXT ); + logger.debug("SharedQueueConsumer starting new trace", { + reasonStats: this._reasonStats, + actionStats: this._actionStats, + outcomeStats: this._outcomeStats, + iterationCount: this._iterationsCount, + consumerId: this._id, + }); + // Get the span trace context this._currentSpanContext = trace.setSpan(ROOT_CONTEXT, this._currentSpan); @@ -351,6 +357,10 @@ export class SharedQueueConsumer { try { const result = await this.#doWorkInternal(); + if (result.reason !== "no_message_dequeued") { + logger.debug("SharedQueueConsumer doWorkInternal result", { result }); + } + this._reasonStats[result.reason] = (this._reasonStats[result.reason] ?? 0) + 1; this._outcomeStats[result.outcome] = (this._outcomeStats[result.outcome] ?? 0) + 1; @@ -371,6 +381,9 @@ export class SharedQueueConsumer { if (result.error) { span.recordException(result.error); span.setStatus({ code: SpanStatusCode.ERROR }); + this._currentSpan?.recordException(result.error); + this._currentSpan?.setStatus({ code: SpanStatusCode.ERROR }); + this._endSpanInNextIteration = true; } if (typeof result.interval === "number") { @@ -755,7 +768,7 @@ export class SharedQueueConsumer { ); if (!queue) { - logger.debug("SharedQueueConsumer queue not found, so nacking message", { + logger.debug("SharedQueueConsumer queue not found, so acking message", { queueMessage: message, taskRunQueue: lockedTaskRun.queue, runtimeEnvironmentId: lockedTaskRun.runtimeEnvironmentId, @@ -876,33 +889,49 @@ export class SharedQueueConsumer { machinePresetFromRun(lockedTaskRun) ?? machinePresetFromConfig(lockedTaskRun.lockedBy?.machineConfig ?? {}); - await this.#startActiveSpan("scheduleAttemptOnProvider", async (span) => { - await this._providerSender.send("BACKGROUND_WORKER_MESSAGE", { - backgroundWorkerId: worker.friendlyId, - data: { - type: "SCHEDULE_ATTEMPT", - image: imageReference, - version: deployment.version, - machine, - nextAttemptNumber, - // identifiers - id: "placeholder", // TODO: Remove this completely in a future release - envId: lockedTaskRun.runtimeEnvironment.id, - envType: lockedTaskRun.runtimeEnvironment.type, - orgId: lockedTaskRun.runtimeEnvironment.organizationId, - projectId: lockedTaskRun.runtimeEnvironment.projectId, - runId: lockedTaskRun.id, - }, + return await this.#startActiveSpan("scheduleAttemptOnProvider", async (span) => { + span.setAttributes({ + run_id: lockedTaskRun.id, }); - }); - return { - action: "noop", - reason: "scheduled_attempt", - attrs: { - next_attempt_number: nextAttemptNumber, - }, - }; + if (await this._providerSender.validateCanSendMessage()) { + await this._providerSender.send("BACKGROUND_WORKER_MESSAGE", { + backgroundWorkerId: worker.friendlyId, + data: { + type: "SCHEDULE_ATTEMPT", + image: imageReference, + version: deployment.version, + machine, + nextAttemptNumber, + // identifiers + id: "placeholder", // TODO: Remove this completely in a future release + envId: lockedTaskRun.runtimeEnvironment.id, + envType: lockedTaskRun.runtimeEnvironment.type, + orgId: lockedTaskRun.runtimeEnvironment.organizationId, + projectId: lockedTaskRun.runtimeEnvironment.projectId, + runId: lockedTaskRun.id, + }, + }); + + return { + action: "noop", + reason: "scheduled_attempt", + attrs: { + next_attempt_number: nextAttemptNumber, + }, + }; + } else { + return { + action: "nack_and_do_more_work", + reason: "provider_not_connected", + attrs: { + run_id: lockedTaskRun.id, + }, + interval: this._options.nextTickInterval, + retryInMs: 5_000, + }; + } + }); } } catch (e) { // We now need to unlock the task run and delete the task run attempt @@ -929,6 +958,8 @@ export class SharedQueueConsumer { action: "nack_and_do_more_work", reason: "failed_to_schedule_attempt", error: e instanceof Error ? e : String(e), + interval: this._options.nextTickInterval, + retryInMs: 5_000, }; } } diff --git a/apps/webapp/app/v3/requeueTaskRun.server.ts b/apps/webapp/app/v3/requeueTaskRun.server.ts index 9105588c68..548a7e7d43 100644 --- a/apps/webapp/app/v3/requeueTaskRun.server.ts +++ b/apps/webapp/app/v3/requeueTaskRun.server.ts @@ -19,6 +19,7 @@ export class RequeueTaskRunService extends BaseService { id: true, friendlyId: true, status: true, + lockedAt: true, runtimeEnvironment: { select: { type: true, @@ -42,9 +43,29 @@ export class RequeueTaskRunService extends BaseService { switch (taskRun.status) { case "PENDING": { - logger.debug("[RequeueTaskRunService] Requeueing task run", { taskRun }); + if (taskRun.lockedAt) { + logger.debug( + "[RequeueTaskRunService] Failing task run because the heartbeat failed and it's PENDING but locked", + { taskRun } + ); + + const service = new FailedTaskRunService(); + + await service.call(taskRun.friendlyId, { + ok: false, + id: taskRun.friendlyId, + retry: undefined, + error: { + type: "INTERNAL_ERROR", + code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT, + message: "Did not receive a heartbeat from the worker in time", + }, + }); + } else { + logger.debug("[RequeueTaskRunService] Nacking task run", { taskRun }); - await marqs?.nackMessage(taskRun.id); + await marqs?.nackMessage(taskRun.id); + } break; } diff --git a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts index bdf8cf3781..8f2dcc7dcc 100644 --- a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts +++ b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts @@ -254,9 +254,12 @@ async function getAuthenticatedEnvironmentFromRun( friendlyId: string, prismaClient?: PrismaClientOrTransaction ) { - const taskRun = await (prismaClient ?? prisma).taskRun.findUnique({ + const isFriendlyId = friendlyId.startsWith("run_"); + + const taskRun = await (prismaClient ?? prisma).taskRun.findFirst({ where: { - friendlyId, + id: !isFriendlyId ? friendlyId : undefined, + friendlyId: isFriendlyId ? friendlyId : undefined, }, include: { runtimeEnvironment: { diff --git a/apps/webapp/app/v3/sharedSocketConnection.ts b/apps/webapp/app/v3/sharedSocketConnection.ts index 67d4732e28..2e644b7f27 100644 --- a/apps/webapp/app/v3/sharedSocketConnection.ts +++ b/apps/webapp/app/v3/sharedSocketConnection.ts @@ -80,6 +80,14 @@ export class SharedSocketConnection { } }); }, + canSendMessage() { + // Return true if there is at least 1 connected socket on the namespace + if (opts.namespace.sockets.size === 0) { + return false; + } + + return Array.from(opts.namespace.sockets.values()).some((socket) => socket.connected); + }, }); logger.debug("Starting SharedQueueConsumer pool", { diff --git a/packages/core/src/logger.ts b/packages/core/src/logger.ts index 90ddc81b1d..d46ee06012 100644 --- a/packages/core/src/logger.ts +++ b/packages/core/src/logger.ts @@ -96,12 +96,17 @@ export class Logger { // Get the current context from trace if it exists const currentSpan = trace.getSpan(context.active()); + const structuredError = extractStructuredErrorFromArgs(...args); + const structuredMessage = extractStructuredMessageFromArgs(...args); + const structuredLog = { ...structureArgs(safeJsonClone(args) as Record[], this.#filteredKeys), ...this.#additionalFields(), + ...(structuredError ? { error: structuredError } : {}), timestamp: new Date(), name: this.#name, message, + ...(structuredMessage ? { $message: structuredMessage } : {}), level, traceId: currentSpan && currentSpan.isRecording() ? currentSpan?.spanContext().traceId : undefined, @@ -118,6 +123,44 @@ export class Logger { } } +// Detect if args is an error object +// Or if args contains an error object at the "error" key +// In both cases, return the error object as a structured error +function extractStructuredErrorFromArgs(...args: Array | undefined>) { + const error = args.find((arg) => arg instanceof Error) as Error | undefined; + + if (error) { + return { + message: error.message, + stack: error.stack, + name: error.name, + }; + } + + const structuredError = args.find((arg) => arg?.error); + + if (structuredError && structuredError.error instanceof Error) { + return { + message: structuredError.error.message, + stack: structuredError.error.stack, + name: structuredError.error.name, + }; + } + + return; +} + +function extractStructuredMessageFromArgs(...args: Array | undefined>) { + // Check to see if there is a `message` key in the args, and if so, return it + const structuredMessage = args.find((arg) => arg?.message); + + if (structuredMessage) { + return structuredMessage.message; + } + + return; +} + function createReplacer(replacer?: (key: string, value: unknown) => unknown) { return (key: string, value: unknown) => { if (typeof value === "bigint") { diff --git a/packages/core/src/v3/zodMessageHandler.ts b/packages/core/src/v3/zodMessageHandler.ts index c380529fb4..7e1f71fd55 100644 --- a/packages/core/src/v3/zodMessageHandler.ts +++ b/packages/core/src/v3/zodMessageHandler.ts @@ -239,15 +239,25 @@ type ZodMessageSenderCallback = export type ZodMessageSenderOptions = { schema: TMessageCatalog; sender: ZodMessageSenderCallback; + canSendMessage?: () => Promise | boolean; }; export class ZodMessageSender { #schema: TMessageCatalog; #sender: ZodMessageSenderCallback; + #canSendMessage?: ZodMessageSenderOptions["canSendMessage"]; constructor(options: ZodMessageSenderOptions) { this.#schema = options.schema; this.#sender = options.sender; + this.#canSendMessage = options.canSendMessage; + } + + public async validateCanSendMessage(): Promise { + if (!this.#canSendMessage) { + return true; + } + return await this.#canSendMessage(); } public async send( From 14d2ea9741dbde41753e38426f29ffb668d46c67 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 23 Jan 2025 23:24:12 +0000 Subject: [PATCH 2/2] Prevent findUnique from bringing down our database --- apps/webapp/app/v3/services/resumeTaskDependency.server.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/webapp/app/v3/services/resumeTaskDependency.server.ts b/apps/webapp/app/v3/services/resumeTaskDependency.server.ts index fc970b31cd..bdbece9778 100644 --- a/apps/webapp/app/v3/services/resumeTaskDependency.server.ts +++ b/apps/webapp/app/v3/services/resumeTaskDependency.server.ts @@ -6,7 +6,7 @@ import { logger } from "~/services/logger.server"; export class ResumeTaskDependencyService extends BaseService { public async call(dependencyId: string, sourceTaskAttemptId: string) { - const dependency = await this._prisma.taskRunDependency.findUnique({ + const dependency = await this._prisma.taskRunDependency.findFirst({ where: { id: dependencyId }, include: { taskRun: {