From 68f8abac558bb2f22673238177552152735a90b6 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 25 Mar 2025 18:07:29 +0000 Subject: [PATCH 1/3] Improve the new run engine Trigger/Batch trigger service class names --- apps/webapp/app/routes/api.v2.tasks.batch.ts | 4 +- .../services/batchTrigger.server.ts} | 12 +- .../runEngine/services/triggerTask.server.ts | 592 ++++++++++++++++++ apps/webapp/app/services/worker.server.ts | 12 +- .../app/v3/services/triggerTask.server.ts | 4 +- 5 files changed, 608 insertions(+), 16 deletions(-) rename apps/webapp/app/{v3/services/batchTriggerV4.server.ts => runEngine/services/batchTrigger.server.ts} (98%) create mode 100644 apps/webapp/app/runEngine/services/triggerTask.server.ts diff --git a/apps/webapp/app/routes/api.v2.tasks.batch.ts b/apps/webapp/app/routes/api.v2.tasks.batch.ts index e2c13e1aa9..62732ff7ad 100644 --- a/apps/webapp/app/routes/api.v2.tasks.batch.ts +++ b/apps/webapp/app/routes/api.v2.tasks.batch.ts @@ -10,9 +10,9 @@ import { logger } from "~/services/logger.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { BatchProcessingStrategy } from "~/v3/services/batchTriggerV3.server"; -import { BatchTriggerV4Service } from "~/v3/services/batchTriggerV4.server"; import { OutOfEntitlementError } from "~/v3/services/triggerTask.server"; import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger"; +import { RunEngineBatchTriggerService } from "~/runEngine/services/batchTrigger.server"; const { action, loader } = createActionApiRoute( { @@ -74,7 +74,7 @@ const { action, loader } = createActionApiRoute( ? { traceparent, tracestate } : undefined; - const service = new BatchTriggerV4Service(batchProcessingStrategy ?? undefined); + const service = new RunEngineBatchTriggerService(batchProcessingStrategy ?? undefined); try { const batch = await service.call(authentication.environment, body, { diff --git a/apps/webapp/app/v3/services/batchTriggerV4.server.ts b/apps/webapp/app/runEngine/services/batchTrigger.server.ts similarity index 98% rename from apps/webapp/app/v3/services/batchTriggerV4.server.ts rename to apps/webapp/app/runEngine/services/batchTrigger.server.ts index 1434a8f915..74f16395e4 100644 --- a/apps/webapp/app/v3/services/batchTriggerV4.server.ts +++ b/apps/webapp/app/runEngine/services/batchTrigger.server.ts @@ -15,10 +15,10 @@ import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { getEntitlement } from "~/services/platform.v3.server"; import { workerQueue } from "~/services/worker.server"; -import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../r2.server"; -import { startActiveSpan } from "../tracer.server"; -import { ServiceValidationError, WithRunEngine } from "./baseService.server"; -import { OutOfEntitlementError, TriggerTaskService } from "./triggerTask.server"; +import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../../v3/r2.server"; +import { startActiveSpan } from "../../v3/tracer.server"; +import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server"; +import { OutOfEntitlementError, TriggerTaskService } from "../../v3/services/triggerTask.server"; const PROCESSING_BATCH_SIZE = 50; const ASYNC_BATCH_PROCESS_SIZE_THRESHOLD = 20; @@ -49,7 +49,7 @@ export type BatchTriggerTaskServiceOptions = { /** * Larger batches, used in Run Engine v2 */ -export class BatchTriggerV4Service extends WithRunEngine { +export class RunEngineBatchTriggerService extends WithRunEngine { private _batchProcessingStrategy: BatchProcessingStrategy; constructor( @@ -643,7 +643,7 @@ export class BatchTriggerV4Service extends WithRunEngine { } async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) { - await workerQueue.enqueue("v3.processBatchTaskRunV3", options, { + await workerQueue.enqueue("runengine.processBatchTaskRun", options, { tx, jobKey: `BatchTriggerV3Service.process:${options.batchId}:${options.processingId}`, }); diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts new file mode 100644 index 0000000000..93ceb4a34d --- /dev/null +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -0,0 +1,592 @@ +import { RunDuplicateIdempotencyKeyError, RunEngine } from "@internal/run-engine"; +import { + IOPacket, + packetRequiresOffloading, + SemanticInternalAttributes, + TaskRunError, + taskRunErrorEnhancer, + taskRunErrorToString, + TriggerTaskRequestBody, +} from "@trigger.dev/core/v3"; +import { + BatchId, + RunId, + sanitizeQueueName, + stringifyDuration, +} from "@trigger.dev/core/v3/isomorphic"; +import { Prisma } from "@trigger.dev/database"; +import { env } from "~/env.server"; +import { createTags, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server"; +import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { autoIncrementCounter } from "~/services/autoIncrementCounter.server"; +import { logger } from "~/services/logger.server"; +import { getEntitlement } from "~/services/platform.v3.server"; +import { parseDelay } from "~/utils/delays"; +import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; +import { handleMetadataPacket } from "~/utils/packets"; +import { eventRepository } from "../../v3/eventRepository.server"; +import { findCurrentWorkerFromEnvironment } from "../../v3/models/workerDeployment.server"; +import { uploadPacketToObjectStore } from "../../v3/r2.server"; +import { getTaskEventStore } from "../../v3/taskEventStore.server"; +import { isFinalRunStatus } from "../../v3/taskStatus"; +import { startActiveSpan } from "../../v3/tracer.server"; +import { clampMaxDuration } from "../../v3/utils/maxDuration"; +import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server"; +import { + MAX_ATTEMPTS, + OutOfEntitlementError, + TriggerTaskServiceOptions, + TriggerTaskServiceResult, +} from "../../v3/services/triggerTask.server"; +import { WorkerGroupService } from "../../v3/services/worker/workerGroupService.server"; + +export class RunEngineTriggerTaskService extends WithRunEngine { + public async call({ + taskId, + environment, + body, + options = {}, + attempt = 0, + }: { + taskId: string; + environment: AuthenticatedEnvironment; + body: TriggerTaskRequestBody; + options?: TriggerTaskServiceOptions; + attempt?: number; + }): Promise { + return await this.traceWithEnv("call()", environment, async (span) => { + span.setAttribute("taskId", taskId); + span.setAttribute("attempt", attempt); + + if (attempt > MAX_ATTEMPTS) { + throw new ServiceValidationError( + `Failed to trigger ${taskId} after ${MAX_ATTEMPTS} attempts.` + ); + } + + const idempotencyKey = options.idempotencyKey ?? body.options?.idempotencyKey; + const idempotencyKeyExpiresAt = + options.idempotencyKeyExpiresAt ?? + resolveIdempotencyKeyTTL(body.options?.idempotencyKeyTTL) ?? + new Date(Date.now() + 24 * 60 * 60 * 1000 * 30); // 30 days + + const delayUntil = await parseDelay(body.options?.delay); + + const ttl = + typeof body.options?.ttl === "number" + ? stringifyDuration(body.options?.ttl) + : body.options?.ttl ?? (environment.type === "DEVELOPMENT" ? "10m" : undefined); + + const existingRun = idempotencyKey + ? await this._prisma.taskRun.findFirst({ + where: { + runtimeEnvironmentId: environment.id, + idempotencyKey, + taskIdentifier: taskId, + }, + include: { + associatedWaitpoint: true, + }, + }) + : undefined; + + if (existingRun) { + if ( + existingRun.idempotencyKeyExpiresAt && + existingRun.idempotencyKeyExpiresAt < new Date() + ) { + logger.debug("[TriggerTaskService][call] Idempotency key has expired", { + idempotencyKey: options.idempotencyKey, + run: existingRun, + }); + + // Update the existing run to remove the idempotency key + await this._prisma.taskRun.update({ + where: { id: existingRun.id }, + data: { idempotencyKey: null }, + }); + } else { + span.setAttribute("runId", existingRun.friendlyId); + + //We're using `andWait` so we need to block the parent run with a waitpoint + if ( + existingRun.associatedWaitpoint && + body.options?.resumeParentOnCompletion && + body.options?.parentRunId + ) { + await eventRepository.traceEvent( + `${taskId} (cached)`, + { + context: options.traceContext, + spanParentAsLink: options.spanParentAsLink, + parentAsLinkType: options.parentAsLinkType, + kind: "SERVER", + environment, + taskSlug: taskId, + attributes: { + properties: { + [SemanticInternalAttributes.SHOW_ACTIONS]: true, + [SemanticInternalAttributes.ORIGINAL_RUN_ID]: existingRun.friendlyId, + }, + style: { + icon: "task-cached", + }, + runIsTest: body.options?.test ?? false, + batchId: options.batchId ? BatchId.toFriendlyId(options.batchId) : undefined, + idempotencyKey, + runId: existingRun.friendlyId, + }, + incomplete: existingRun.associatedWaitpoint.status === "PENDING", + isError: existingRun.associatedWaitpoint.outputIsError, + immediate: true, + }, + async (event) => { + //log a message + await eventRepository.recordEvent( + `There's an existing run for idempotencyKey: ${idempotencyKey}`, + { + taskSlug: taskId, + environment, + attributes: { + runId: existingRun.friendlyId, + }, + context: options.traceContext, + parentId: event.spanId, + } + ); + //block run with waitpoint + await this._engine.blockRunWithWaitpoint({ + runId: RunId.fromFriendlyId(body.options!.parentRunId!), + waitpoints: existingRun.associatedWaitpoint!.id, + spanIdToComplete: event.spanId, + batch: options?.batchId + ? { + id: options.batchId, + index: options.batchIndex ?? 0, + } + : undefined, + projectId: environment.projectId, + organizationId: environment.organizationId, + tx: this._prisma, + releaseConcurrency: body.options?.releaseConcurrency, + }); + } + ); + } + + return { run: existingRun, isCached: true }; + } + } + + if (environment.type !== "DEVELOPMENT") { + const result = await getEntitlement(environment.organizationId); + if (result && result.hasAccess === false) { + throw new OutOfEntitlementError(); + } + } + + if (!options.skipChecks) { + const queueSizeGuard = await guardQueueSizeLimitsForEnv(this._engine, environment); + + logger.debug("Queue size guard result", { + queueSizeGuard, + environment: { + id: environment.id, + type: environment.type, + organization: environment.organization, + project: environment.project, + }, + }); + + if (!queueSizeGuard.isWithinLimits) { + throw new ServiceValidationError( + `Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}` + ); + } + } + + if ( + body.options?.tags && + typeof body.options.tags !== "string" && + body.options.tags.length > MAX_TAGS_PER_RUN + ) { + throw new ServiceValidationError( + `Runs can only have ${MAX_TAGS_PER_RUN} tags, you're trying to set ${body.options.tags.length}.` + ); + } + + const runFriendlyId = options?.runFriendlyId ?? RunId.generate().friendlyId; + + const payloadPacket = await this.#handlePayloadPacket( + body.payload, + body.options?.payloadType ?? "application/json", + runFriendlyId, + environment + ); + + const metadataPacket = body.options?.metadata + ? handleMetadataPacket( + body.options?.metadata, + body.options?.metadataType ?? "application/json" + ) + : undefined; + + const parentRun = body.options?.parentRunId + ? await this._prisma.taskRun.findFirst({ + where: { id: RunId.fromFriendlyId(body.options.parentRunId) }, + }) + : undefined; + + if (parentRun && isFinalRunStatus(parentRun.status)) { + logger.debug("Parent run is in a terminal state", { + parentRun, + }); + + throw new ServiceValidationError( + `Cannot trigger ${taskId} as the parent run has a status of ${parentRun.status}` + ); + } + + try { + return await eventRepository.traceEvent( + taskId, + { + context: options.traceContext, + spanParentAsLink: options.spanParentAsLink, + parentAsLinkType: options.parentAsLinkType, + kind: "SERVER", + environment, + taskSlug: taskId, + attributes: { + properties: { + [SemanticInternalAttributes.SHOW_ACTIONS]: true, + }, + style: { + icon: options.customIcon ?? "task", + }, + runIsTest: body.options?.test ?? false, + batchId: options.batchId ? BatchId.toFriendlyId(options.batchId) : undefined, + idempotencyKey, + }, + incomplete: true, + immediate: true, + }, + async (event, traceContext, traceparent) => { + const result = await autoIncrementCounter.incrementInTransaction( + `v3-run:${environment.id}:${taskId}`, + async (num, tx) => { + const lockedToBackgroundWorker = body.options?.lockToVersion + ? await tx.backgroundWorker.findFirst({ + where: { + projectId: environment.projectId, + runtimeEnvironmentId: environment.id, + version: body.options?.lockToVersion, + }, + }) + : undefined; + + let queueName = sanitizeQueueName( + await this.#getQueueName(taskId, environment, body.options?.queue?.name) + ); + + // Check that the queuename is not an empty string + if (!queueName) { + queueName = sanitizeQueueName(`task/${taskId}`); + } + + event.setAttribute("queueName", queueName); + span.setAttribute("queueName", queueName); + + //upsert tags + let tags: { id: string; name: string }[] = []; + const bodyTags = + typeof body.options?.tags === "string" ? [body.options.tags] : body.options?.tags; + + if (bodyTags && bodyTags.length > 0) { + const tagRecords = await createTags( + { + tags: bodyTags, + projectId: environment.projectId, + }, + this._prisma + ); + tags = tagRecords.filter(Boolean).map((tr) => ({ id: tr.id, name: tr.name })); + } + + const depth = parentRun ? parentRun.depth + 1 : 0; + + event.setAttribute("runId", runFriendlyId); + span.setAttribute("runId", runFriendlyId); + + const masterQueue = await this.#getMasterQueueForEnvironment(environment); + + const taskRun = await this._engine.trigger( + { + number: num, + friendlyId: runFriendlyId, + environment: environment, + idempotencyKey, + idempotencyKeyExpiresAt: idempotencyKey ? idempotencyKeyExpiresAt : undefined, + taskIdentifier: taskId, + payload: payloadPacket.data ?? "", + payloadType: payloadPacket.dataType, + context: body.context, + traceContext: traceContext, + traceId: event.traceId, + spanId: event.spanId, + parentSpanId: + options.parentAsLinkType === "replay" ? undefined : traceparent?.spanId, + lockedToVersionId: lockedToBackgroundWorker?.id, + taskVersion: lockedToBackgroundWorker?.version, + sdkVersion: lockedToBackgroundWorker?.sdkVersion, + cliVersion: lockedToBackgroundWorker?.cliVersion, + concurrencyKey: body.options?.concurrencyKey, + queue: queueName, + masterQueue: masterQueue, + isTest: body.options?.test ?? false, + delayUntil, + queuedAt: delayUntil ? undefined : new Date(), + maxAttempts: body.options?.maxAttempts, + taskEventStore: getTaskEventStore(), + ttl, + tags, + oneTimeUseToken: options.oneTimeUseToken, + parentTaskRunId: parentRun?.id, + rootTaskRunId: parentRun?.rootTaskRunId ?? parentRun?.id, + batch: options?.batchId + ? { + id: options.batchId, + index: options.batchIndex ?? 0, + } + : undefined, + resumeParentOnCompletion: body.options?.resumeParentOnCompletion, + depth, + metadata: metadataPacket?.data, + metadataType: metadataPacket?.dataType, + seedMetadata: metadataPacket?.data, + seedMetadataType: metadataPacket?.dataType, + maxDurationInSeconds: body.options?.maxDuration + ? clampMaxDuration(body.options.maxDuration) + : undefined, + machine: body.options?.machine, + priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined, + releaseConcurrency: body.options?.releaseConcurrency, + }, + this._prisma + ); + + const error = taskRun.error ? TaskRunError.parse(taskRun.error) : undefined; + + if (error) { + event.failWithError(error); + } + + return { run: taskRun, error, isCached: false }; + }, + async (_, tx) => { + const counter = await tx.taskRunNumberCounter.findFirst({ + where: { + taskIdentifier: taskId, + environmentId: environment.id, + }, + select: { lastNumber: true }, + }); + + return counter?.lastNumber; + }, + this._prisma + ); + + if (result?.error) { + throw new ServiceValidationError( + taskRunErrorToString(taskRunErrorEnhancer(result.error)) + ); + } + + return result; + } + ); + } catch (error) { + if (error instanceof RunDuplicateIdempotencyKeyError) { + //retry calling this function, because this time it will return the idempotent run + return await this.call({ taskId, environment, body, options, attempt: attempt + 1 }); + } + + // Detect a prisma transaction Unique constraint violation + if (error instanceof Prisma.PrismaClientKnownRequestError) { + logger.debug("TriggerTask: Prisma transaction error", { + code: error.code, + message: error.message, + meta: error.meta, + }); + + if (error.code === "P2002") { + const target = error.meta?.target; + + if ( + Array.isArray(target) && + target.length > 0 && + typeof target[0] === "string" && + target[0].includes("oneTimeUseToken") + ) { + throw new ServiceValidationError( + `Cannot trigger ${taskId} with a one-time use token as it has already been used.` + ); + } else { + throw new ServiceValidationError( + `Cannot trigger ${taskId} as it has already been triggered with the same idempotency key.` + ); + } + } + } + + throw error; + } + }); + } + + async #getMasterQueueForEnvironment(environment: AuthenticatedEnvironment) { + if (environment.type === "DEVELOPMENT") { + return; + } + + const workerGroupService = new WorkerGroupService({ + prisma: this._prisma, + engine: this._engine, + }); + + const workerGroup = await workerGroupService.getDefaultWorkerGroupForProject({ + projectId: environment.projectId, + }); + + if (!workerGroup) { + throw new ServiceValidationError("No worker group found"); + } + + return workerGroup.masterQueue; + } + + async #getQueueName(taskId: string, environment: AuthenticatedEnvironment, queueName?: string) { + if (queueName) { + return queueName; + } + + const defaultQueueName = `task/${taskId}`; + + const worker = await findCurrentWorkerFromEnvironment(environment); + + if (!worker) { + logger.debug("Failed to get queue name: No worker found", { + taskId, + environmentId: environment.id, + }); + + return defaultQueueName; + } + + const task = await this._prisma.backgroundWorkerTask.findFirst({ + where: { + workerId: worker.id, + slug: taskId, + }, + include: { + queue: true, + }, + }); + + if (!task) { + console.log("Failed to get queue name: No task found", { + taskId, + environmentId: environment.id, + }); + + return defaultQueueName; + } + + if (!task.queue) { + console.log("Failed to get queue name: No queue found", { + taskId, + environmentId: environment.id, + queueConfig: task.queueConfig, + }); + + return defaultQueueName; + } + + return task.queue.name ?? defaultQueueName; + } + + async #handlePayloadPacket( + payload: any, + payloadType: string, + pathPrefix: string, + environment: AuthenticatedEnvironment + ) { + return await startActiveSpan("handlePayloadPacket()", async (span) => { + const packet = this.#createPayloadPacket(payload, payloadType); + + if (!packet.data) { + return packet; + } + + const { needsOffloading, size } = packetRequiresOffloading( + packet, + env.TASK_PAYLOAD_OFFLOAD_THRESHOLD + ); + + if (!needsOffloading) { + return packet; + } + + const filename = `${pathPrefix}/payload.json`; + + await uploadPacketToObjectStore(filename, packet.data, packet.dataType, environment); + + return { + data: filename, + dataType: "application/store", + }; + }); + } + + #createPayloadPacket(payload: any, payloadType: string): IOPacket { + if (payloadType === "application/json") { + return { data: JSON.stringify(payload), dataType: "application/json" }; + } + + if (typeof payload === "string") { + return { data: payload, dataType: payloadType }; + } + + return { dataType: payloadType }; + } +} + +function getMaximumSizeForEnvironment(environment: AuthenticatedEnvironment): number | undefined { + if (environment.type === "DEVELOPMENT") { + return environment.organization.maximumDevQueueSize ?? env.MAXIMUM_DEV_QUEUE_SIZE; + } else { + return environment.organization.maximumDeployedQueueSize ?? env.MAXIMUM_DEPLOYED_QUEUE_SIZE; + } +} + +export async function guardQueueSizeLimitsForEnv( + engine: RunEngine, + environment: AuthenticatedEnvironment, + itemsToAdd: number = 1 +) { + const maximumSize = getMaximumSizeForEnvironment(environment); + + if (typeof maximumSize === "undefined") { + return { isWithinLimits: true }; + } + + const queueSize = await engine.lengthOfEnvQueue(environment); + const projectedSize = queueSize + itemsToAdd; + + return { + isWithinLimits: projectedSize <= maximumSize, + maximumSize, + queueSize, + }; +} diff --git a/apps/webapp/app/services/worker.server.ts b/apps/webapp/app/services/worker.server.ts index 217d427230..75a7ded41d 100644 --- a/apps/webapp/app/services/worker.server.ts +++ b/apps/webapp/app/services/worker.server.ts @@ -29,9 +29,9 @@ import { reportInvocationUsage } from "./platform.v3.server"; import { logger } from "./logger.server"; import { BatchProcessingOptions, BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server"; import { - BatchProcessingOptions as BatchProcessingOptionsV4, - BatchTriggerV4Service, -} from "~/v3/services/batchTriggerV4.server"; + BatchProcessingOptions as RunEngineBatchProcessingOptions, + RunEngineBatchTriggerService, +} from "~/runEngine/services/batchTrigger.server"; const workerCatalog = { scheduleEmail: DeliverEmailSchema, @@ -99,7 +99,7 @@ const workerCatalog = { }), "v3.cancelDevSessionRuns": CancelDevSessionRunsServiceOptions, "v3.processBatchTaskRun": BatchProcessingOptions, - "v3.processBatchTaskRunV3": BatchProcessingOptionsV4, + "runengine.processBatchTaskRun": RunEngineBatchProcessingOptions, }; let workerQueue: ZodWorker; @@ -341,11 +341,11 @@ function getWorkerQueue() { await service.processBatchTaskRun(payload); }, }, - "v3.processBatchTaskRunV3": { + "runengine.processBatchTaskRun": { priority: 0, maxAttempts: 5, handler: async (payload, job) => { - const service = new BatchTriggerV4Service(payload.strategy); + const service = new RunEngineBatchTriggerService(payload.strategy); await service.processBatchTaskRun(payload); }, diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index b7aef6450f..52560ec3d5 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -4,7 +4,7 @@ import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { determineEngineVersion } from "../engineVersion.server"; import { WithRunEngine } from "./baseService.server"; import { TriggerTaskServiceV1 } from "./triggerTaskV1.server"; -import { TriggerTaskServiceV2 } from "./triggerTaskV2.server"; +import { RunEngineTriggerTaskService } from "~/runEngine/services/triggerTask.server"; export type TriggerTaskServiceOptions = { idempotencyKey?: string; @@ -78,7 +78,7 @@ export class TriggerTaskService extends WithRunEngine { body: TriggerTaskRequestBody, options: TriggerTaskServiceOptions = {} ): Promise { - const service = new TriggerTaskServiceV2({ + const service = new RunEngineTriggerTaskService({ prisma: this._prisma, engine: this._engine, }); From ea8e9dcfebf3cc7c2eb42893eedd39efdefd8950 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 25 Mar 2025 22:32:14 +0000 Subject: [PATCH 2/3] centralize queue timestamp logic in EnqueueSystem, adding queueTimestamp support and propagation --- .../runEngine/services/triggerTask.server.ts | 4 + .../run-engine/src/engine/index.ts | 3 +- .../src/engine/systems/checkpointSystem.ts | 1 - .../src/engine/systems/delayedRunSystem.ts | 1 - .../src/engine/systems/enqueueSystem.ts | 4 +- .../engine/systems/pendingVersionSystem.ts | 3 - .../src/engine/systems/waitpointSystem.ts | 2 - .../src/engine/tests/priority.test.ts | 306 ++++++++++++------ .../run-engine/src/engine/types.ts | 1 + references/test-tasks/src/trigger/helpers.ts | 2 +- 10 files changed, 214 insertions(+), 113 deletions(-) diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 93ceb4a34d..e5933a0ab0 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -371,6 +371,10 @@ export class RunEngineTriggerTaskService extends WithRunEngine { machine: body.options?.machine, priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined, releaseConcurrency: body.options?.releaseConcurrency, + queueTimestamp: + parentRun && body.options?.resumeParentOnCompletion + ? parentRun.queueTimestamp ?? undefined + : undefined, }, this._prisma ); diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 80fa8f4607..dcd71db84d 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -334,6 +334,7 @@ export class RunEngine { maxAttempts, taskEventStore, priorityMs, + queueTimestamp, ttl, tags, parentTaskRunId, @@ -414,6 +415,7 @@ export class RunEngine { maxAttempts, taskEventStore, priorityMs, + queueTimestamp: queueTimestamp ?? delayUntil ?? new Date(), ttl, tags: tags.length === 0 @@ -520,7 +522,6 @@ export class RunEngine { await this.enqueueSystem.enqueueRun({ run: taskRun, env: environment, - timestamp: Date.now() - taskRun.priorityMs, workerId, runnerId, tx: prisma, diff --git a/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts b/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts index de06fca524..71f54b988e 100644 --- a/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts @@ -161,7 +161,6 @@ export class CheckpointSystem { const newSnapshot = await this.enqueueSystem.enqueueRun({ run, env: run.runtimeEnvironment, - timestamp: run.createdAt.getTime() - run.priorityMs, snapshot: { status: "QUEUED", description: diff --git a/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts b/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts index c954a8d7e1..6c7e410bac 100644 --- a/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts @@ -98,7 +98,6 @@ export class DelayedRunSystem { await this.enqueueSystem.enqueueRun({ run, env: run.runtimeEnvironment, - timestamp: run.createdAt.getTime() - run.priorityMs, batchId: run.batchId ?? undefined, }); diff --git a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts index 0ed309792e..a702e516f3 100644 --- a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts @@ -25,7 +25,6 @@ export class EnqueueSystem { public async enqueueRun({ run, env, - timestamp, tx, snapshot, previousSnapshotId, @@ -37,7 +36,6 @@ export class EnqueueSystem { }: { run: TaskRun; env: MinimalAuthenticatedEnvironment; - timestamp: number; tx?: PrismaClientOrTransaction; snapshot?: { status?: Extract; @@ -81,6 +79,8 @@ export class EnqueueSystem { masterQueues.push(run.secondaryMasterQueue); } + const timestamp = (run.queueTimestamp ?? run.createdAt).getTime() - run.priorityMs; + await this.$.runQueue.enqueueMessage({ env, masterQueues, diff --git a/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts b/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts index 991d8b9aa0..8be87cca7e 100644 --- a/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts @@ -97,9 +97,6 @@ export class PendingVersionSystem { await this.enqueueSystem.enqueueRun({ run: updatedRun, env: backgroundWorker.runtimeEnvironment, - //add to the queue using the original run created time - //this should ensure they're in the correct order in the queue - timestamp: updatedRun.createdAt.getTime() - updatedRun.priorityMs, tx, }); }); diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index ee5d79895d..8d4fe32eab 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -540,7 +540,6 @@ export class WaitpointSystem { await this.enqueueSystem.enqueueRun({ run, env: run.runtimeEnvironment, - timestamp: run.createdAt.getTime() - run.priorityMs, snapshot: { status: "QUEUED_EXECUTING", description: "Run can continue, but is waiting for concurrency", @@ -564,7 +563,6 @@ export class WaitpointSystem { await this.enqueueSystem.enqueueRun({ run, env: run.runtimeEnvironment, - timestamp: run.createdAt.getTime() - run.priorityMs, snapshot: { description: "Run was QUEUED, because all waitpoints are completed", }, diff --git a/internal-packages/run-engine/src/engine/tests/priority.test.ts b/internal-packages/run-engine/src/engine/tests/priority.test.ts index 779238d0cf..2467449585 100644 --- a/internal-packages/run-engine/src/engine/tests/priority.test.ts +++ b/internal-packages/run-engine/src/engine/tests/priority.test.ts @@ -10,130 +10,232 @@ import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js vi.setConfig({ testTimeout: 60_000 }); describe("RunEngine priority", () => { - containerTest("Two runs execute in the correct order", async ({ prisma, redisOptions }) => { - //create environment - const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); - - const engine = new RunEngine({ - prisma, - worker: { - redis: redisOptions, - workers: 1, - tasksPerWorker: 10, - pollIntervalMs: 100, - }, - queue: { - redis: redisOptions, - }, - runLock: { - redis: redisOptions, - }, - machines: { - defaultMachine: "small-1x", + containerTest( + "runs execute in priority order based on priorityMs", + async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, machines: { - "small-1x": { - name: "small-1x" as const, - cpu: 0.5, - memory: 0.5, - centsPerMs: 0.0001, + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, }, + baseCostInCents: 0.0005, }, - baseCostInCents: 0.0005, - }, - tracer: trace.getTracer("test", "0.0.0"), - }); - - try { - const taskIdentifier = "test-task"; - - //create background worker - const backgroundWorker = await setupBackgroundWorker( - engine, - authenticatedEnvironment, - taskIdentifier - ); - - //the order should be 4,3,1,0,2 - // 0 1 2 3 4 - const priorities = [undefined, 500, -1200, 1000, 4000]; - - //trigger the runs - const runs = await triggerRuns({ - engine, - environment: authenticatedEnvironment, - taskIdentifier, - prisma, - priorities, + tracer: trace.getTracer("test", "0.0.0"), }); - expect(runs.length).toBe(priorities.length); - //check the queue length - const queueLength = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment); - expect(queueLength).toBe(priorities.length); + try { + const taskIdentifier = "test-task"; - //dequeue (expect 4 items because of the negative priority) - const dequeue = await engine.dequeueFromMasterQueue({ - consumerId: "test_12345", - masterQueue: "main", - maxRunCount: 20, - }); - expect(dequeue.length).toBe(4); - expect(dequeue[0].run.friendlyId).toBe(runs[4].friendlyId); - expect(dequeue[1].run.friendlyId).toBe(runs[3].friendlyId); - expect(dequeue[2].run.friendlyId).toBe(runs[1].friendlyId); - expect(dequeue[3].run.friendlyId).toBe(runs[0].friendlyId); - - //wait 2 seconds (because of the negative priority) - await setTimeout(2_000); - const dequeue2 = await engine.dequeueFromMasterQueue({ - consumerId: "test_12345", - masterQueue: "main", - maxRunCount: 20, + //create background worker + const backgroundWorker = await setupBackgroundWorker( + engine, + authenticatedEnvironment, + taskIdentifier + ); + + //the order should be 4,3,1,0,2 + // 0 1 2 3 4 + const priorities = [undefined, 500, -1200, 1000, 4000]; + + //trigger the runs + const runs = await triggerRuns({ + engine, + environment: authenticatedEnvironment, + taskIdentifier, + prisma, + runs: priorities.map((priority, index) => ({ + number: index, + priorityMs: priority, + })), + }); + expect(runs.length).toBe(priorities.length); + + //check the queue length + const queueLength = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment); + expect(queueLength).toBe(priorities.length); + + //dequeue (expect 4 items because of the negative priority) + const dequeue = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: "main", + maxRunCount: 20, + }); + expect(dequeue.length).toBe(4); + expect(dequeue[0].run.friendlyId).toBe(runs[4].friendlyId); + expect(dequeue[1].run.friendlyId).toBe(runs[3].friendlyId); + expect(dequeue[2].run.friendlyId).toBe(runs[1].friendlyId); + expect(dequeue[3].run.friendlyId).toBe(runs[0].friendlyId); + + //wait 2 seconds (because of the negative priority) + await setTimeout(2_000); + const dequeue2 = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: "main", + maxRunCount: 20, + }); + expect(dequeue2.length).toBe(1); + expect(dequeue2[0].run.friendlyId).toBe(runs[2].friendlyId); + } finally { + engine.quit(); + } + } + ); + + containerTest( + "runs execute in order of their queueTimestamp", + async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), }); - expect(dequeue2.length).toBe(1); - expect(dequeue2[0].run.friendlyId).toBe(runs[2].friendlyId); - } finally { - engine.quit(); + + try { + const taskIdentifier = "test-task"; + + //create background worker + const backgroundWorker = await setupBackgroundWorker( + engine, + authenticatedEnvironment, + taskIdentifier + ); + + //the order should be 2, 3, 1, 4, 0 + const queueTimestamps = [ + undefined, + new Date(3000), + new Date(1000), + new Date(2000), + new Date(4000), + ]; + + //trigger the runs + const runs = await triggerRuns({ + engine, + environment: authenticatedEnvironment, + taskIdentifier, + prisma, + runs: queueTimestamps.map((queueTimestamp, index) => ({ + number: index, + queueTimestamp, + })), + }); + expect(runs.length).toBe(queueTimestamps.length); + + //check the queue length + const queueLength = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment); + expect(queueLength).toBe(queueTimestamps.length); + + //dequeue (expect 4 items because of the negative priority) + const dequeue = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: "main", + maxRunCount: 20, + }); + expect(dequeue.length).toBe(queueTimestamps.length); + expect(dequeue[0].run.friendlyId).toBe(runs[2].friendlyId); + expect(dequeue[1].run.friendlyId).toBe(runs[3].friendlyId); + expect(dequeue[2].run.friendlyId).toBe(runs[1].friendlyId); + expect(dequeue[3].run.friendlyId).toBe(runs[4].friendlyId); + expect(dequeue[4].run.friendlyId).toBe(runs[0].friendlyId); + } finally { + engine.quit(); + } } - }); + ); }); async function triggerRuns({ engine, environment, taskIdentifier, - priorities, + runs, prisma, }: { engine: RunEngine; environment: MinimalAuthenticatedEnvironment; taskIdentifier: string; prisma: PrismaClientOrTransaction; - priorities: (number | undefined)[]; + runs: { + number: number; + priorityMs?: number; + queueTimestamp?: Date; + }[]; }) { - const runs = []; - for (let i = 0; i < priorities.length; i++) { - runs[i] = await engine.trigger( - { - number: i, - friendlyId: generateFriendlyId("run"), - environment, - taskIdentifier, - payload: "{}", - payloadType: "application/json", - context: {}, - traceContext: {}, - traceId: "t12345", - spanId: "s12345", - masterQueue: "main", - queue: `task/${taskIdentifier}`, - isTest: false, - tags: [], - priorityMs: priorities[i], - }, - prisma + const triggeredRuns = []; + for (const run of runs) { + triggeredRuns.push( + await engine.trigger( + { + number: run.number, + friendlyId: generateFriendlyId("run"), + environment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queue: `task/${taskIdentifier}`, + isTest: false, + tags: [], + priorityMs: run.priorityMs, + queueTimestamp: run.queueTimestamp, + }, + prisma + ) ); } - return runs; + return triggeredRuns; } diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index c4ad4942c3..abf520573f 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -87,6 +87,7 @@ export type TriggerParams = { maxAttempts?: number; taskEventStore?: string; priorityMs?: number; + queueTimestamp?: Date; ttl?: string; tags: { id: string; name: string }[]; parentTaskRunId?: string; diff --git a/references/test-tasks/src/trigger/helpers.ts b/references/test-tasks/src/trigger/helpers.ts index 8fcffb3f29..ed47c198a1 100644 --- a/references/test-tasks/src/trigger/helpers.ts +++ b/references/test-tasks/src/trigger/helpers.ts @@ -76,7 +76,7 @@ export const retryTask = task({ throw new Error("Error"); } }, - handleError: async (payload, error, { ctx }) => { + handleError: async ({ ctx, payload, error }) => { if (!payload.throwError) { return { skipRetrying: true, From 73cb766434cf7ae2e270ca6240511f84c56230cc Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 26 Mar 2025 09:53:02 +0000 Subject: [PATCH 3/3] Fixing the create tags in the new run engine trigger task service --- apps/webapp/app/models/taskRunTag.server.ts | 50 +- .../runEngine/services/triggerTask.server.ts | 21 +- .../app/v3/services/triggerTaskV2.server.ts | 593 ------------------ references/hello-world/src/trigger/tags.ts | 22 + 4 files changed, 77 insertions(+), 609 deletions(-) delete mode 100644 apps/webapp/app/v3/services/triggerTaskV2.server.ts create mode 100644 references/hello-world/src/trigger/tags.ts diff --git a/apps/webapp/app/models/taskRunTag.server.ts b/apps/webapp/app/models/taskRunTag.server.ts index f676b99a0c..19014078b3 100644 --- a/apps/webapp/app/models/taskRunTag.server.ts +++ b/apps/webapp/app/models/taskRunTag.server.ts @@ -1,11 +1,15 @@ import { prisma } from "~/db.server"; import { generateFriendlyId } from "~/v3/friendlyIdentifiers"; +import { PrismaClientOrTransaction } from "@trigger.dev/database"; export const MAX_TAGS_PER_RUN = 10; -export async function createTag({ tag, projectId }: { tag: string; projectId: string }) { +export async function createTag( + { tag, projectId }: { tag: string; projectId: string }, + prismaClient: PrismaClientOrTransaction = prisma +) { if (tag.trim().length === 0) return; - return prisma.taskRunTag.upsert({ + return prismaClient.taskRunTag.upsert({ where: { projectId_name: { projectId: projectId, @@ -21,6 +25,48 @@ export async function createTag({ tag, projectId }: { tag: string; projectId: st }); } +export type TagRecord = { + id: string; + name: string; +}; + +export async function createTags( + { + tags, + projectId, + }: { + tags: string | string[] | undefined; + projectId: string; + }, + prismaClient: PrismaClientOrTransaction = prisma +): Promise { + if (!tags) { + return []; + } + + const tagsArray = typeof tags === "string" ? [tags] : tags; + + if (tagsArray.length === 0) { + return []; + } + + const tagRecords: TagRecord[] = []; + for (const tag of tagsArray) { + const tagRecord = await createTag( + { + tag, + projectId, + }, + prismaClient + ); + if (tagRecord) { + tagRecords.push({ id: tagRecord.id, name: tagRecord.name }); + } + } + + return tagRecords; +} + export async function getTagsForRunId({ friendlyId, environmentId, diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index e5933a0ab0..a010b6632a 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -298,20 +298,13 @@ export class RunEngineTriggerTaskService extends WithRunEngine { span.setAttribute("queueName", queueName); //upsert tags - let tags: { id: string; name: string }[] = []; - const bodyTags = - typeof body.options?.tags === "string" ? [body.options.tags] : body.options?.tags; - - if (bodyTags && bodyTags.length > 0) { - const tagRecords = await createTags( - { - tags: bodyTags, - projectId: environment.projectId, - }, - this._prisma - ); - tags = tagRecords.filter(Boolean).map((tr) => ({ id: tr.id, name: tr.name })); - } + const tags = await createTags( + { + tags: body.options?.tags, + projectId: environment.projectId, + }, + this._prisma + ); const depth = parentRun ? parentRun.depth + 1 : 0; diff --git a/apps/webapp/app/v3/services/triggerTaskV2.server.ts b/apps/webapp/app/v3/services/triggerTaskV2.server.ts deleted file mode 100644 index d7bc8b9f6a..0000000000 --- a/apps/webapp/app/v3/services/triggerTaskV2.server.ts +++ /dev/null @@ -1,593 +0,0 @@ -import { RunEngine, RunDuplicateIdempotencyKeyError } from "@internal/run-engine"; -import { - IOPacket, - packetRequiresOffloading, - SemanticInternalAttributes, - TaskRunError, - taskRunErrorEnhancer, - taskRunErrorToString, - TriggerTaskRequestBody, -} from "@trigger.dev/core/v3"; -import { - BatchId, - RunId, - sanitizeQueueName, - stringifyDuration, -} from "@trigger.dev/core/v3/isomorphic"; -import { Prisma, TaskRun } from "@trigger.dev/database"; -import { env } from "~/env.server"; -import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server"; -import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; -import { autoIncrementCounter } from "~/services/autoIncrementCounter.server"; -import { logger } from "~/services/logger.server"; -import { getEntitlement } from "~/services/platform.v3.server"; -import { parseDelay } from "~/utils/delays"; -import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; -import { handleMetadataPacket } from "~/utils/packets"; -import { eventRepository } from "../eventRepository.server"; -import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server"; -import { uploadPacketToObjectStore } from "../r2.server"; -import { isFinalRunStatus } from "../taskStatus"; -import { startActiveSpan } from "../tracer.server"; -import { clampMaxDuration } from "../utils/maxDuration"; -import { ServiceValidationError, WithRunEngine } from "./baseService.server"; -import { - MAX_ATTEMPTS, - OutOfEntitlementError, - TriggerTaskServiceOptions, - TriggerTaskServiceResult, -} from "./triggerTask.server"; -import { WorkerGroupService } from "./worker/workerGroupService.server"; -import { getTaskEventStore } from "../taskEventStore.server"; - -/** @deprecated Use TriggerTaskService in `triggerTask.server.ts` instead. */ -export class TriggerTaskServiceV2 extends WithRunEngine { - public async call({ - taskId, - environment, - body, - options = {}, - attempt = 0, - }: { - taskId: string; - environment: AuthenticatedEnvironment; - body: TriggerTaskRequestBody; - options?: TriggerTaskServiceOptions; - attempt?: number; - }): Promise { - return await this.traceWithEnv("call()", environment, async (span) => { - span.setAttribute("taskId", taskId); - span.setAttribute("attempt", attempt); - - if (attempt > MAX_ATTEMPTS) { - throw new ServiceValidationError( - `Failed to trigger ${taskId} after ${MAX_ATTEMPTS} attempts.` - ); - } - - const idempotencyKey = options.idempotencyKey ?? body.options?.idempotencyKey; - const idempotencyKeyExpiresAt = - options.idempotencyKeyExpiresAt ?? - resolveIdempotencyKeyTTL(body.options?.idempotencyKeyTTL) ?? - new Date(Date.now() + 24 * 60 * 60 * 1000 * 30); // 30 days - - const delayUntil = await parseDelay(body.options?.delay); - - const ttl = - typeof body.options?.ttl === "number" - ? stringifyDuration(body.options?.ttl) - : body.options?.ttl ?? (environment.type === "DEVELOPMENT" ? "10m" : undefined); - - const existingRun = idempotencyKey - ? await this._prisma.taskRun.findFirst({ - where: { - runtimeEnvironmentId: environment.id, - idempotencyKey, - taskIdentifier: taskId, - }, - include: { - associatedWaitpoint: true, - }, - }) - : undefined; - - if (existingRun) { - if ( - existingRun.idempotencyKeyExpiresAt && - existingRun.idempotencyKeyExpiresAt < new Date() - ) { - logger.debug("[TriggerTaskService][call] Idempotency key has expired", { - idempotencyKey: options.idempotencyKey, - run: existingRun, - }); - - // Update the existing run to remove the idempotency key - await this._prisma.taskRun.update({ - where: { id: existingRun.id }, - data: { idempotencyKey: null }, - }); - } else { - span.setAttribute("runId", existingRun.friendlyId); - - //We're using `andWait` so we need to block the parent run with a waitpoint - if ( - existingRun.associatedWaitpoint && - body.options?.resumeParentOnCompletion && - body.options?.parentRunId - ) { - await eventRepository.traceEvent( - `${taskId} (cached)`, - { - context: options.traceContext, - spanParentAsLink: options.spanParentAsLink, - parentAsLinkType: options.parentAsLinkType, - kind: "SERVER", - environment, - taskSlug: taskId, - attributes: { - properties: { - [SemanticInternalAttributes.SHOW_ACTIONS]: true, - [SemanticInternalAttributes.ORIGINAL_RUN_ID]: existingRun.friendlyId, - }, - style: { - icon: "task-cached", - }, - runIsTest: body.options?.test ?? false, - batchId: options.batchId ? BatchId.toFriendlyId(options.batchId) : undefined, - idempotencyKey, - runId: existingRun.friendlyId, - }, - incomplete: existingRun.associatedWaitpoint.status === "PENDING", - isError: existingRun.associatedWaitpoint.outputIsError, - immediate: true, - }, - async (event) => { - //log a message - await eventRepository.recordEvent( - `There's an existing run for idempotencyKey: ${idempotencyKey}`, - { - taskSlug: taskId, - environment, - attributes: { - runId: existingRun.friendlyId, - }, - context: options.traceContext, - parentId: event.spanId, - } - ); - //block run with waitpoint - await this._engine.blockRunWithWaitpoint({ - runId: RunId.fromFriendlyId(body.options!.parentRunId!), - waitpoints: existingRun.associatedWaitpoint!.id, - spanIdToComplete: event.spanId, - batch: options?.batchId - ? { - id: options.batchId, - index: options.batchIndex ?? 0, - } - : undefined, - projectId: environment.projectId, - organizationId: environment.organizationId, - tx: this._prisma, - releaseConcurrency: body.options?.releaseConcurrency, - }); - } - ); - } - - return { run: existingRun, isCached: true }; - } - } - - if (environment.type !== "DEVELOPMENT") { - const result = await getEntitlement(environment.organizationId); - if (result && result.hasAccess === false) { - throw new OutOfEntitlementError(); - } - } - - if (!options.skipChecks) { - const queueSizeGuard = await guardQueueSizeLimitsForEnv(this._engine, environment); - - logger.debug("Queue size guard result", { - queueSizeGuard, - environment: { - id: environment.id, - type: environment.type, - organization: environment.organization, - project: environment.project, - }, - }); - - if (!queueSizeGuard.isWithinLimits) { - throw new ServiceValidationError( - `Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}` - ); - } - } - - if ( - body.options?.tags && - typeof body.options.tags !== "string" && - body.options.tags.length > MAX_TAGS_PER_RUN - ) { - throw new ServiceValidationError( - `Runs can only have ${MAX_TAGS_PER_RUN} tags, you're trying to set ${body.options.tags.length}.` - ); - } - - const runFriendlyId = options?.runFriendlyId ?? RunId.generate().friendlyId; - - const payloadPacket = await this.#handlePayloadPacket( - body.payload, - body.options?.payloadType ?? "application/json", - runFriendlyId, - environment - ); - - const metadataPacket = body.options?.metadata - ? handleMetadataPacket( - body.options?.metadata, - body.options?.metadataType ?? "application/json" - ) - : undefined; - - const parentRun = body.options?.parentRunId - ? await this._prisma.taskRun.findFirst({ - where: { id: RunId.fromFriendlyId(body.options.parentRunId) }, - }) - : undefined; - - if (parentRun && isFinalRunStatus(parentRun.status)) { - logger.debug("Parent run is in a terminal state", { - parentRun, - }); - - throw new ServiceValidationError( - `Cannot trigger ${taskId} as the parent run has a status of ${parentRun.status}` - ); - } - - try { - return await eventRepository.traceEvent( - taskId, - { - context: options.traceContext, - spanParentAsLink: options.spanParentAsLink, - parentAsLinkType: options.parentAsLinkType, - kind: "SERVER", - environment, - taskSlug: taskId, - attributes: { - properties: { - [SemanticInternalAttributes.SHOW_ACTIONS]: true, - }, - style: { - icon: options.customIcon ?? "task", - }, - runIsTest: body.options?.test ?? false, - batchId: options.batchId ? BatchId.toFriendlyId(options.batchId) : undefined, - idempotencyKey, - }, - incomplete: true, - immediate: true, - }, - async (event, traceContext, traceparent) => { - const result = await autoIncrementCounter.incrementInTransaction( - `v3-run:${environment.id}:${taskId}`, - async (num, tx) => { - const lockedToBackgroundWorker = body.options?.lockToVersion - ? await tx.backgroundWorker.findFirst({ - where: { - projectId: environment.projectId, - runtimeEnvironmentId: environment.id, - version: body.options?.lockToVersion, - }, - }) - : undefined; - - let queueName = sanitizeQueueName( - await this.#getQueueName(taskId, environment, body.options?.queue?.name) - ); - - // Check that the queuename is not an empty string - if (!queueName) { - queueName = sanitizeQueueName(`task/${taskId}`); - } - - event.setAttribute("queueName", queueName); - span.setAttribute("queueName", queueName); - - //upsert tags - let tags: { id: string; name: string }[] = []; - const bodyTags = - typeof body.options?.tags === "string" ? [body.options.tags] : body.options?.tags; - if (bodyTags && bodyTags.length > 0) { - for (const tag of bodyTags) { - const tagRecord = await createTag({ - tag, - projectId: environment.projectId, - }); - if (tagRecord) { - tags.push(tagRecord); - } - } - } - - const depth = parentRun ? parentRun.depth + 1 : 0; - - event.setAttribute("runId", runFriendlyId); - span.setAttribute("runId", runFriendlyId); - - const masterQueue = await this.#getMasterQueueForEnvironment(environment); - - const taskRun = await this._engine.trigger( - { - number: num, - friendlyId: runFriendlyId, - environment: environment, - idempotencyKey, - idempotencyKeyExpiresAt: idempotencyKey ? idempotencyKeyExpiresAt : undefined, - taskIdentifier: taskId, - payload: payloadPacket.data ?? "", - payloadType: payloadPacket.dataType, - context: body.context, - traceContext: traceContext, - traceId: event.traceId, - spanId: event.spanId, - parentSpanId: - options.parentAsLinkType === "replay" ? undefined : traceparent?.spanId, - lockedToVersionId: lockedToBackgroundWorker?.id, - taskVersion: lockedToBackgroundWorker?.version, - sdkVersion: lockedToBackgroundWorker?.sdkVersion, - cliVersion: lockedToBackgroundWorker?.cliVersion, - concurrencyKey: body.options?.concurrencyKey, - queue: queueName, - masterQueue: masterQueue, - isTest: body.options?.test ?? false, - delayUntil, - queuedAt: delayUntil ? undefined : new Date(), - maxAttempts: body.options?.maxAttempts, - taskEventStore: getTaskEventStore(), - ttl, - tags, - oneTimeUseToken: options.oneTimeUseToken, - parentTaskRunId: parentRun?.id, - rootTaskRunId: parentRun?.rootTaskRunId ?? parentRun?.id, - batch: options?.batchId - ? { - id: options.batchId, - index: options.batchIndex ?? 0, - } - : undefined, - resumeParentOnCompletion: body.options?.resumeParentOnCompletion, - depth, - metadata: metadataPacket?.data, - metadataType: metadataPacket?.dataType, - seedMetadata: metadataPacket?.data, - seedMetadataType: metadataPacket?.dataType, - maxDurationInSeconds: body.options?.maxDuration - ? clampMaxDuration(body.options.maxDuration) - : undefined, - machine: body.options?.machine, - priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined, - releaseConcurrency: body.options?.releaseConcurrency, - }, - this._prisma - ); - - const error = taskRun.error ? TaskRunError.parse(taskRun.error) : undefined; - - if (error) { - event.failWithError(error); - } - - return { run: taskRun, error, isCached: false }; - }, - async (_, tx) => { - const counter = await tx.taskRunNumberCounter.findFirst({ - where: { - taskIdentifier: taskId, - environmentId: environment.id, - }, - select: { lastNumber: true }, - }); - - return counter?.lastNumber; - }, - this._prisma - ); - - if (result?.error) { - throw new ServiceValidationError( - taskRunErrorToString(taskRunErrorEnhancer(result.error)) - ); - } - - return result; - } - ); - } catch (error) { - if (error instanceof RunDuplicateIdempotencyKeyError) { - //retry calling this function, because this time it will return the idempotent run - return await this.call({ taskId, environment, body, options, attempt: attempt + 1 }); - } - - // Detect a prisma transaction Unique constraint violation - if (error instanceof Prisma.PrismaClientKnownRequestError) { - logger.debug("TriggerTask: Prisma transaction error", { - code: error.code, - message: error.message, - meta: error.meta, - }); - - if (error.code === "P2002") { - const target = error.meta?.target; - - if ( - Array.isArray(target) && - target.length > 0 && - typeof target[0] === "string" && - target[0].includes("oneTimeUseToken") - ) { - throw new ServiceValidationError( - `Cannot trigger ${taskId} with a one-time use token as it has already been used.` - ); - } else { - throw new ServiceValidationError( - `Cannot trigger ${taskId} as it has already been triggered with the same idempotency key.` - ); - } - } - } - - throw error; - } - }); - } - - async #getMasterQueueForEnvironment(environment: AuthenticatedEnvironment) { - if (environment.type === "DEVELOPMENT") { - return; - } - - const workerGroupService = new WorkerGroupService({ - prisma: this._prisma, - engine: this._engine, - }); - - const workerGroup = await workerGroupService.getDefaultWorkerGroupForProject({ - projectId: environment.projectId, - }); - - if (!workerGroup) { - throw new ServiceValidationError("No worker group found"); - } - - return workerGroup.masterQueue; - } - - async #getQueueName(taskId: string, environment: AuthenticatedEnvironment, queueName?: string) { - if (queueName) { - return queueName; - } - - const defaultQueueName = `task/${taskId}`; - - const worker = await findCurrentWorkerFromEnvironment(environment); - - if (!worker) { - logger.debug("Failed to get queue name: No worker found", { - taskId, - environmentId: environment.id, - }); - - return defaultQueueName; - } - - const task = await this._prisma.backgroundWorkerTask.findFirst({ - where: { - workerId: worker.id, - slug: taskId, - }, - include: { - queue: true, - }, - }); - - if (!task) { - console.log("Failed to get queue name: No task found", { - taskId, - environmentId: environment.id, - }); - - return defaultQueueName; - } - - if (!task.queue) { - console.log("Failed to get queue name: No queue found", { - taskId, - environmentId: environment.id, - queueConfig: task.queueConfig, - }); - - return defaultQueueName; - } - - return task.queue.name ?? defaultQueueName; - } - - async #handlePayloadPacket( - payload: any, - payloadType: string, - pathPrefix: string, - environment: AuthenticatedEnvironment - ) { - return await startActiveSpan("handlePayloadPacket()", async (span) => { - const packet = this.#createPayloadPacket(payload, payloadType); - - if (!packet.data) { - return packet; - } - - const { needsOffloading, size } = packetRequiresOffloading( - packet, - env.TASK_PAYLOAD_OFFLOAD_THRESHOLD - ); - - if (!needsOffloading) { - return packet; - } - - const filename = `${pathPrefix}/payload.json`; - - await uploadPacketToObjectStore(filename, packet.data, packet.dataType, environment); - - return { - data: filename, - dataType: "application/store", - }; - }); - } - - #createPayloadPacket(payload: any, payloadType: string): IOPacket { - if (payloadType === "application/json") { - return { data: JSON.stringify(payload), dataType: "application/json" }; - } - - if (typeof payload === "string") { - return { data: payload, dataType: payloadType }; - } - - return { dataType: payloadType }; - } -} - -function getMaximumSizeForEnvironment(environment: AuthenticatedEnvironment): number | undefined { - if (environment.type === "DEVELOPMENT") { - return environment.organization.maximumDevQueueSize ?? env.MAXIMUM_DEV_QUEUE_SIZE; - } else { - return environment.organization.maximumDeployedQueueSize ?? env.MAXIMUM_DEPLOYED_QUEUE_SIZE; - } -} - -export async function guardQueueSizeLimitsForEnv( - engine: RunEngine, - environment: AuthenticatedEnvironment, - itemsToAdd: number = 1 -) { - const maximumSize = getMaximumSizeForEnvironment(environment); - - if (typeof maximumSize === "undefined") { - return { isWithinLimits: true }; - } - - const queueSize = await engine.lengthOfEnvQueue(environment); - const projectedSize = queueSize + itemsToAdd; - - return { - isWithinLimits: projectedSize <= maximumSize, - maximumSize, - queueSize, - }; -} diff --git a/references/hello-world/src/trigger/tags.ts b/references/hello-world/src/trigger/tags.ts new file mode 100644 index 0000000000..c7b97c97cf --- /dev/null +++ b/references/hello-world/src/trigger/tags.ts @@ -0,0 +1,22 @@ +import { logger, task, wait } from "@trigger.dev/sdk"; + +export const tagsTester = task({ + id: "tags-tester", + run: async (payload: any, { ctx }) => { + await tagsChildTask.trigger( + { + tags: ["tag1", "tag2"], + }, + { + tags: ["user:user1", "org:org1"], + } + ); + }, +}); + +export const tagsChildTask = task({ + id: "tags-child", + run: async (payload: any, { ctx }) => { + logger.log("Hello, world from the child", { payload }); + }, +});