diff --git a/apps/webapp/app/v3/legacyRunEngineWorker.server.ts b/apps/webapp/app/v3/legacyRunEngineWorker.server.ts index a5ca5c8483..7a82d5522c 100644 --- a/apps/webapp/app/v3/legacyRunEngineWorker.server.ts +++ b/apps/webapp/app/v3/legacyRunEngineWorker.server.ts @@ -7,6 +7,7 @@ import { singleton } from "~/utils/singleton"; import { TaskRunHeartbeatFailedService } from "./taskRunHeartbeatFailed.server"; import { completeBatchTaskRunItemV3 } from "./services/batchTriggerV3.server"; import { prisma } from "~/db.server"; +import { marqs } from "./marqs/index.server"; function initializeWorker() { const redisOptions = { @@ -49,6 +50,15 @@ function initializeWorker() { maxAttempts: 10, }, }, + scheduleRequeueMessage: { + schema: z.object({ + messageId: z.string(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 5, + }, + }, }, concurrency: { workers: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS, @@ -74,6 +84,9 @@ function initializeWorker() { attempt ); }, + scheduleRequeueMessage: async ({ payload }) => { + await marqs.requeueMessageById(payload.messageId); + }, }, }); diff --git a/apps/webapp/app/v3/marqs/constants.server.ts b/apps/webapp/app/v3/marqs/constants.server.ts new file mode 100644 index 0000000000..6ba8dd2a6a --- /dev/null +++ b/apps/webapp/app/v3/marqs/constants.server.ts @@ -0,0 +1,4 @@ +export const MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET = 31_556_952 * 1000; // 1 year +export const MARQS_RETRY_PRIORITY_TIMESTAMP_OFFSET = 15_778_476 * 1000; // 6 months +export const MARQS_DELAYED_REQUEUE_THRESHOLD_IN_MS = 500; +export const MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS = 500; diff --git a/apps/webapp/app/v3/marqs/envPriorityDequeuingStrategy.server.ts b/apps/webapp/app/v3/marqs/envPriorityDequeuingStrategy.server.ts deleted file mode 100644 index 7acaf48818..0000000000 --- a/apps/webapp/app/v3/marqs/envPriorityDequeuingStrategy.server.ts +++ /dev/null @@ -1,95 +0,0 @@ -import { EnvQueues, MarQSFairDequeueStrategy, MarQSKeyProducer } from "./types"; - -export type EnvPriorityDequeuingStrategyOptions = { - keys: MarQSKeyProducer; - delegate: MarQSFairDequeueStrategy; -}; - -export class EnvPriorityDequeuingStrategy implements MarQSFairDequeueStrategy { - private _delegate: MarQSFairDequeueStrategy; - - constructor(private options: EnvPriorityDequeuingStrategyOptions) { - this._delegate = options.delegate; - } - - async distributeFairQueuesFromParentQueue( - parentQueue: string, - consumerId: string - ): Promise> { - const envQueues = await this._delegate.distributeFairQueuesFromParentQueue( - parentQueue, - consumerId - ); - - return this.#sortQueuesInEnvironmentsByPriority(envQueues); - } - - #sortQueuesInEnvironmentsByPriority(envs: EnvQueues[]): EnvQueues[] { - return envs.map((env) => { - return this.#sortQueuesInEnvironmentByPriority(env); - }); - } - - // Sorts the queues by priority. A higher priority means the queue should be dequeued first. - // All the queues with the same priority should keep the order they were in the original list. - // So that means if all the queues have the same priority, the order should be preserved. - #sortQueuesInEnvironmentByPriority(env: EnvQueues): EnvQueues { - const queues = env.queues; - - // Group queues by their base name (without priority) - const queueGroups = new Map(); - - queues.forEach((queue) => { - const descriptor = this.options.keys.queueDescriptorFromQueue(queue); - const baseQueueName = this.options.keys.queueKey( - descriptor.organization, - descriptor.environment, - descriptor.name, - descriptor.concurrencyKey - ); - - if (!queueGroups.has(baseQueueName)) { - queueGroups.set(baseQueueName, []); - } - - queueGroups.get(baseQueueName)!.push(queue); - }); - - // For each group, keep only the highest priority queue - const resultQueues: string[] = []; - queueGroups.forEach((groupQueues) => { - const sortedGroupQueues = [...groupQueues].sort((a, b) => { - const aPriority = this.#getQueuePriority(a); - const bPriority = this.#getQueuePriority(b); - - if (aPriority === bPriority) { - return 0; - } - - return bPriority - aPriority; - }); - - resultQueues.push(sortedGroupQueues[0]); - }); - - // Sort the final result by priority - const sortedQueues = resultQueues.sort((a, b) => { - const aPriority = this.#getQueuePriority(a); - const bPriority = this.#getQueuePriority(b); - - if (aPriority === bPriority) { - return 0; - } - - return bPriority - aPriority; - }); - - return { envId: env.envId, queues: sortedQueues }; - } - - #getQueuePriority(queue: string): number { - const queueRecord = this.options.keys.queueDescriptorFromQueue(queue); - - return queueRecord.priority ?? 0; - } -} diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index f8c36248d0..b85fef788b 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -10,9 +10,10 @@ import { } from "@opentelemetry/api"; import { SEMATTRS_MESSAGE_ID, - SEMATTRS_MESSAGING_OPERATION, SEMATTRS_MESSAGING_SYSTEM, + SEMATTRS_MESSAGING_OPERATION, } from "@opentelemetry/semantic-conventions"; +import { flattenAttributes } from "@trigger.dev/core/v3"; import Redis, { type Callback, type Result } from "ioredis"; import { env } from "~/env.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; @@ -28,13 +29,19 @@ import { MarQSFairDequeueStrategy, MarQSKeyProducer, MarQSKeyProducerEnv, + MarQSPriorityLevel, MessagePayload, MessageQueueSubscriber, VisibilityTimeoutStrategy, } from "./types"; import { V3LegacyRunEngineWorkerVisibilityTimeout } from "./v3VisibilityTimeout.server"; -import { flattenAttributes } from "@trigger.dev/core/v3"; -import { EnvPriorityDequeuingStrategy } from "./envPriorityDequeuingStrategy.server"; +import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server"; +import { + MARQS_DELAYED_REQUEUE_THRESHOLD_IN_MS, + MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET, + MARQS_RETRY_PRIORITY_TIMESTAMP_OFFSET, + MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS, +} from "./constants.server"; const KEY_PREFIX = "marqs:"; @@ -65,11 +72,6 @@ export type MarQSOptions = { subscriber?: MessageQueueSubscriber; }; -export const MarQSPriorityLevel = { - resume: 100, - retry: 10, -} as const; - /** * MarQS - Multitenant Asynchronous Reliable Queueing System (pronounced "markus") */ @@ -199,30 +201,35 @@ export class MarQS { concurrencyKey?: string, timestamp?: number | Date, reserve?: EnqueueMessageReserveConcurrencyOptions, - priority?: number + priority?: MarQSPriorityLevel ) { return await this.#trace( "enqueueMessage", async (span) => { - const messageQueue = this.keys.queueKey(env, queue, concurrencyKey, priority); + const messageQueue = this.keys.queueKey(env, queue, concurrencyKey); const parentQueue = this.keys.envSharedQueueKey(env); propagation.inject(context.active(), messageData); + const $timestamp = + typeof timestamp === "undefined" + ? Date.now() + : typeof timestamp === "number" + ? timestamp + : timestamp.getTime(); + const messagePayload: MessagePayload = { version: "1", data: messageData, queue: messageQueue, concurrencyKey, - timestamp: - typeof timestamp === "undefined" - ? Date.now() - : typeof timestamp === "number" - ? timestamp - : timestamp.getTime(), + timestamp: $timestamp, messageId, parentQueue, + priority, + availableAt: Date.now(), + enqueueMethod: "enqueue", }; span.setAttributes({ @@ -292,9 +299,11 @@ export class MarQS { timestamp: timestamp ?? Date.now(), messageId, parentQueue: oldMessage.parentQueue, + priority: oldMessage.priority, + enqueueMethod: "replace", }; - await this.#callReplaceMessage(newMessage); + await this.#saveMessageIfExists(newMessage); }, { kind: SpanKind.CONSUMER, @@ -311,7 +320,7 @@ export class MarQS { messageId: string, messageData: Record, timestamp?: number, - priority?: number + priority?: MarQSPriorityLevel ) { return this.#trace( "requeueMessage", @@ -322,10 +331,8 @@ export class MarQS { return; } - const queue = this.keys.queueKeyFromQueue(oldMessage.queue, priority); - span.setAttributes({ - [SemanticAttributes.QUEUE]: queue, + [SemanticAttributes.QUEUE]: oldMessage.queue, [SemanticAttributes.MESSAGE_ID]: oldMessage.messageId, [SemanticAttributes.CONCURRENCY_KEY]: oldMessage.concurrencyKey, [SemanticAttributes.PARENT_QUEUE]: oldMessage.parentQueue, @@ -336,22 +343,37 @@ export class MarQS { tracestate: oldMessage.data.tracestate, }; + const $timestamp = timestamp ?? Date.now(); + const newMessage: MessagePayload = { version: "1", // preserve original trace context - data: { ...oldMessage.data, ...messageData, ...traceContext, queue }, - queue, + data: { + ...oldMessage.data, + ...messageData, + ...traceContext, + queue: oldMessage.queue, + }, + queue: oldMessage.queue, concurrencyKey: oldMessage.concurrencyKey, - timestamp: timestamp ?? Date.now(), + timestamp: $timestamp, messageId, parentQueue: oldMessage.parentQueue, + priority: priority ?? oldMessage.priority, + availableAt: $timestamp, + enqueueMethod: "requeue", }; await this.options.visibilityTimeoutStrategy.cancelHeartbeat(messageId); - await this.#callRequeueMessage(oldMessage.queue, newMessage); - - await this.options.subscriber?.messageRequeued(oldMessage.queue, newMessage); + // If the message timestamp is enough in the future (e.g. more than 500ms from now), + // we will schedule it to be requeued in the future using the legacy run engine redis worker + // If not, we just requeue it immediately + if ($timestamp > Date.now() + MARQS_DELAYED_REQUEUE_THRESHOLD_IN_MS) { + await this.#callDelayedRequeueMessage(newMessage); + } else { + await this.#callRequeueMessage(newMessage); + } }, { kind: SpanKind.CONSUMER, @@ -364,6 +386,54 @@ export class MarQS { ); } + public async requeueMessageById(messageId: string) { + return this.#trace( + "requeueMessageById", + async (span) => { + const message = await this.readMessage(messageId); + + if (!message) { + return; + } + + span.setAttributes({ + [SemanticAttributes.QUEUE]: message.queue, + [SemanticAttributes.MESSAGE_ID]: message.messageId, + [SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey, + [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, + }); + + logger.debug(`Requeueing message by id`, { messageId, message, service: this.name }); + + await this.#callRequeueMessage(message); + }, + { + kind: SpanKind.CONSUMER, + attributes: { + [SEMATTRS_MESSAGING_OPERATION]: "requeue_by_id", + [SEMATTRS_MESSAGE_ID]: messageId, + [SEMATTRS_MESSAGING_SYSTEM]: "marqs", + }, + } + ); + } + + async #saveMessage(message: MessagePayload) { + logger.debug(`Saving message`, { message, service: this.name }); + + const messageKey = this.keys.messageKey(message.messageId); + + await this.redis.set(messageKey, JSON.stringify(message)); + } + + async #saveMessageIfExists(message: MessagePayload) { + logger.debug(`Saving message if exists`, { message, service: this.name }); + + const messageKey = this.keys.messageKey(message.messageId); + + await this.redis.set(messageKey, JSON.stringify(message), "XX"); // XX means only set if key exists + } + public async dequeueMessageInEnv(env: AuthenticatedEnvironment) { return this.#trace( "dequeueMessageInEnv", @@ -406,7 +476,11 @@ export class MarQS { [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, attempted_queues: queues.indexOf(messageQueue) + 1, // How many queues we tried before success message_timestamp: message.timestamp, - message_age: Date.now() - message.timestamp, + message_age: this.#calculateMessageAge(message), + message_priority: message.priority, + message_enqueue_method: message.enqueueMethod, + message_available_at: message.availableAt, + ...flattenAttributes(message.data, "message.data"), }); await this.options.subscriber?.messageDequeued(message); @@ -425,7 +499,7 @@ export class MarQS { return; } - await this.options.visibilityTimeoutStrategy.heartbeat( + await this.options.visibilityTimeoutStrategy.startHeartbeat( messageData.messageId, this.visibilityTimeoutInMs ); @@ -496,25 +570,25 @@ export class MarQS { const message = await this.readMessage(messageData.messageId); if (message) { - const ageOfMessageInMs = Date.now() - message.timestamp; - span.setAttributes({ [SEMATTRS_MESSAGE_ID]: message.messageId, [SemanticAttributes.QUEUE]: message.queue, [SemanticAttributes.MESSAGE_ID]: message.messageId, [SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey, [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, - age_in_seconds: ageOfMessageInMs / 1000, attempted_queues: attemptedQueues, // How many queues we tried before success attempted_envs: attemptedEnvs, // How many environments we tried before success message_timestamp: message.timestamp, - message_age: Date.now() - message.timestamp, - ...flattenAttributes(message.data, "message"), + message_age: this.#calculateMessageAge(message), + message_priority: message.priority, + message_enqueue_method: message.enqueueMethod, + message_available_at: message.availableAt, + ...flattenAttributes(message.data, "message.data"), }); await this.options.subscriber?.messageDequeued(message); - await this.options.visibilityTimeoutStrategy.heartbeat( + await this.options.visibilityTimeoutStrategy.startHeartbeat( messageData.messageId, this.visibilityTimeoutInMs ); @@ -719,6 +793,27 @@ export class MarQS { ); } + #nudgeTimestampForPriority(timestamp: number, priority?: MarQSPriorityLevel) { + if (!priority) { + return timestamp; + } + + switch (priority) { + case "resume": { + return timestamp - MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET; + } + case "retry": { + return timestamp - MARQS_RETRY_PRIORITY_TIMESTAMP_OFFSET; + } + } + } + + #calculateMessageAge(message: MessagePayload) { + const $timestamp = message.availableAt ?? message.timestamp; + + return Date.now() - $timestamp; + } + async #getNackCount(messageId: string): Promise { const result = await this.redis.get(this.keys.nackCounterKey(messageId)); @@ -943,7 +1038,9 @@ export class MarQS { const queueName = message.queue; const messageId = message.messageId; const messageData = JSON.stringify(message); - const messageScore = String(message.timestamp); + const messageScore = String( + this.#nudgeTimestampForPriority(message.timestamp, message.priority) + ); if (!reserve) { logger.debug("Calling enqueueMessage", { @@ -1189,21 +1286,8 @@ export class MarQS { }; } - async #callReplaceMessage(message: MessagePayload) { - logger.debug("Calling replaceMessage", { - messagePayload: message, - service: this.name, - }); - - return this.redis.replaceMessage( - this.keys.messageKey(message.messageId), - JSON.stringify(message) - ); - } - - async #callRequeueMessage(oldQueue: string, message: MessagePayload) { + async #callRequeueMessage(message: MessagePayload) { const queueKey = message.queue; - const oldQueueKey = oldQueue; const parentQueueKey = message.parentQueue; const messageKey = this.keys.messageKey(message.messageId); const queueCurrentConcurrencyKey = this.keys.queueCurrentConcurrencyKeyFromQueue(message.queue); @@ -1213,15 +1297,15 @@ export class MarQS { const envQueueKey = this.keys.envQueueKeyFromQueue(message.queue); const queueName = message.queue; - const oldQueueName = oldQueue; const messageId = message.messageId; const messageData = JSON.stringify(message); - const messageScore = String(message.timestamp); + const messageScore = String( + this.#nudgeTimestampForPriority(message.timestamp, message.priority) + ); logger.debug("Calling requeueMessage", { service: this.name, queueKey, - oldQueueKey, parentQueueKey, messageKey, queueCurrentConcurrencyKey, @@ -1230,7 +1314,6 @@ export class MarQS { envReserveConcurrencyKey, envQueueKey, queueName, - oldQueueName, messageId, messageData, messageScore, @@ -1238,7 +1321,6 @@ export class MarQS { const result = await this.redis.requeueMessage( queueKey, - oldQueueKey, parentQueueKey, messageKey, queueCurrentConcurrencyKey, @@ -1247,7 +1329,6 @@ export class MarQS { envReserveConcurrencyKey, envQueueKey, queueName, - oldQueueName, messageId, messageData, messageScore @@ -1270,6 +1351,69 @@ export class MarQS { result, }); + await this.options.subscriber?.messageRequeued(message); + + return true; + } + + async #callDelayedRequeueMessage(message: MessagePayload) { + const messageKey = this.keys.messageKey(message.messageId); + const queueCurrentConcurrencyKey = this.keys.queueCurrentConcurrencyKeyFromQueue(message.queue); + const queueReserveConcurrencyKey = this.keys.queueReserveConcurrencyKeyFromQueue(message.queue); + const envCurrentConcurrencyKey = this.keys.envCurrentConcurrencyKeyFromQueue(message.queue); + const envReserveConcurrencyKey = this.keys.envReserveConcurrencyKeyFromQueue(message.queue); + + const messageId = message.messageId; + const messageData = JSON.stringify(message); + + logger.debug("Calling delayedRequeueMessage", { + service: this.name, + messageKey, + queueCurrentConcurrencyKey, + queueReserveConcurrencyKey, + envCurrentConcurrencyKey, + envReserveConcurrencyKey, + messageId, + messageData, + }); + + const result = await this.redis.delayedRequeueMessage( + messageKey, + queueCurrentConcurrencyKey, + queueReserveConcurrencyKey, + envCurrentConcurrencyKey, + envReserveConcurrencyKey, + messageId, + messageData + ); + + logger.debug("delayedRequeueMessage result", { + service: this.name, + messageKey, + queueCurrentConcurrencyKey, + queueReserveConcurrencyKey, + envCurrentConcurrencyKey, + envReserveConcurrencyKey, + messageId, + messageData, + result, + }); + + logger.debug("Enqueuing scheduleRequeueMessage in LRE worker", { + service: this.name, + message, + }); + + // Schedule the requeue in the future + await legacyRunEngineWorker.enqueue({ + id: `marqs-requeue-${messageId}`, + job: "scheduleRequeueMessage", + payload: { messageId }, + availableAt: new Date( + message.timestamp - MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS + ), + }); + return true; } @@ -1642,25 +1786,6 @@ return {messageId, messageScore} -- Return message details `, }); - this.redis.defineCommand("replaceMessage", { - numberOfKeys: 1, - lua: ` -local messageKey = KEYS[1] -local messageData = ARGV[1] - --- Check if message exists -local existingMessage = redis.call('GET', messageKey) - --- Do nothing if it doesn't -if #existingMessage == nil then - return nil -end - --- Replace the message -redis.call('SET', messageKey, messageData, 'GET') - `, - }); - this.redis.defineCommand("acknowledgeMessage", { numberOfKeys: 8, lua: ` @@ -1698,35 +1823,30 @@ redis.call('DEL', messageKey) }); this.redis.defineCommand("requeueMessage", { - numberOfKeys: 9, + numberOfKeys: 8, lua: ` local queueKey = KEYS[1] -local oldQueueKey = KEYS[2] -local parentQueueKey = KEYS[3] -local messageKey = KEYS[4] -local queueCurrentConcurrencyKey = KEYS[5] -local queueReserveConcurrencyKey = KEYS[6] -local envCurrentConcurrencyKey = KEYS[7] -local envReserveConcurrencyKey = KEYS[8] -local envQueueKey = KEYS[9] +local parentQueueKey = KEYS[2] +local messageKey = KEYS[3] +local queueCurrentConcurrencyKey = KEYS[4] +local queueReserveConcurrencyKey = KEYS[5] +local envCurrentConcurrencyKey = KEYS[6] +local envReserveConcurrencyKey = KEYS[7] +local envQueueKey = KEYS[8] local queueName = ARGV[1] -local oldQueueName = ARGV[2] -local messageId = ARGV[3] -local messageData = ARGV[4] -local messageScore = ARGV[5] - --- First remove the message from the old queue -redis.call('ZREM', oldQueueKey, messageId) +local messageId = ARGV[2] +local messageData = ARGV[3] +local messageScore = ARGV[4] -- Write the new message data redis.call('SET', messageKey, messageData) --- Add the message to the new queue with a new score +-- Add the message to the queue with a new score redis.call('ZADD', queueKey, messageScore, messageId) redis.call('ZADD', envQueueKey, messageScore, messageId) --- Rebalance the parent queue (for the new queue) +-- Rebalance the parent queue local earliestMessage = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') if #earliestMessage == 0 then redis.call('ZREM', parentQueueKey, queueName) @@ -1734,20 +1854,37 @@ else redis.call('ZADD', parentQueueKey, earliestMessage[2], queueName) end --- Rebalance the parent queue (for the old queue) -local earliestMessage = redis.call('ZRANGE', oldQueueKey, 0, 0, 'WITHSCORES') -if #earliestMessage == 0 then - redis.call('ZREM', parentQueueKey, oldQueueName) -else - redis.call('ZADD', parentQueueKey, earliestMessage[2], oldQueueName) -end - -- Clear all concurrency sets (combined from both scripts) redis.call('SREM', queueCurrentConcurrencyKey, messageId) redis.call('SREM', queueReserveConcurrencyKey, messageId) redis.call('SREM', envCurrentConcurrencyKey, messageId) redis.call('SREM', envReserveConcurrencyKey, messageId) +return true +`, + }); + + this.redis.defineCommand("delayedRequeueMessage", { + numberOfKeys: 5, + lua: ` +local messageKey = KEYS[1] +local queueCurrentConcurrencyKey = KEYS[2] +local queueReserveConcurrencyKey = KEYS[3] +local envCurrentConcurrencyKey = KEYS[4] +local envReserveConcurrencyKey = KEYS[5] + +local messageId = ARGV[1] +local messageData = ARGV[2] + +-- Write the new message data +redis.call('SET', messageKey, messageData) + +-- Clear all concurrency sets +redis.call('SREM', queueCurrentConcurrencyKey, messageId) +redis.call('SREM', queueReserveConcurrencyKey, messageId) +redis.call('SREM', envCurrentConcurrencyKey, messageId) +redis.call('SREM', envReserveConcurrencyKey, messageId) + return true `, }); @@ -1908,15 +2045,8 @@ declare module "ioredis" { callback?: Callback<[string, string]> ): Result<[string, string] | null, Context>; - replaceMessage( - messageKey: string, - messageData: string, - callback?: Callback - ): Result; - requeueMessage( queueKey: string, - oldQueueKey: string, parentQueueKey: string, messageKey: string, queueCurrentConcurrencyKey: string, @@ -1925,13 +2055,23 @@ declare module "ioredis" { envReserveConcurrencyKey: string, envQueueKey: string, queueName: string, - oldQueueName: string, messageId: string, messageData: string, messageScore: string, callback?: Callback ): Result; + delayedRequeueMessage( + messageKey: string, + queueCurrentConcurrencyKey: string, + queueReserveConcurrencyKey: string, + envCurrentConcurrencyKey: string, + envReserveConcurrencyKey: string, + messageId: string, + messageData: string, + callback?: Callback + ): Result; + acknowledgeMessage( parentQueue: string, messageKey: string, @@ -2004,37 +2144,31 @@ function getMarQSClient() { tracer: trace.getTracer("marqs"), keysProducer, visibilityTimeoutStrategy: new V3LegacyRunEngineWorkerVisibilityTimeout(), - queuePriorityStrategy: new EnvPriorityDequeuingStrategy({ + queuePriorityStrategy: new FairDequeuingStrategy({ + tracer: tracer, + redis, + parentQueueLimit: env.MARQS_SHARED_QUEUE_LIMIT, keys: keysProducer, - delegate: new FairDequeuingStrategy({ - tracer: tracer, - redis, - parentQueueLimit: env.MARQS_SHARED_QUEUE_LIMIT, - keys: keysProducer, - defaultEnvConcurrency: env.DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT, - biases: { - concurrencyLimitBias: env.MARQS_CONCURRENCY_LIMIT_BIAS, - availableCapacityBias: env.MARQS_AVAILABLE_CAPACITY_BIAS, - queueAgeRandomization: env.MARQS_QUEUE_AGE_RANDOMIZATION_BIAS, - }, - reuseSnapshotCount: env.MARQS_REUSE_SNAPSHOT_COUNT, - maximumEnvCount: env.MARQS_MAXIMUM_ENV_COUNT, - }), + defaultEnvConcurrency: env.DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT, + biases: { + concurrencyLimitBias: env.MARQS_CONCURRENCY_LIMIT_BIAS, + availableCapacityBias: env.MARQS_AVAILABLE_CAPACITY_BIAS, + queueAgeRandomization: env.MARQS_QUEUE_AGE_RANDOMIZATION_BIAS, + }, + reuseSnapshotCount: env.MARQS_REUSE_SNAPSHOT_COUNT, + maximumEnvCount: env.MARQS_MAXIMUM_ENV_COUNT, }), - envQueuePriorityStrategy: new EnvPriorityDequeuingStrategy({ + envQueuePriorityStrategy: new FairDequeuingStrategy({ + tracer: tracer, + redis, + parentQueueLimit: env.MARQS_DEV_QUEUE_LIMIT, keys: keysProducer, - delegate: new FairDequeuingStrategy({ - tracer: tracer, - redis, - parentQueueLimit: env.MARQS_DEV_QUEUE_LIMIT, - keys: keysProducer, - defaultEnvConcurrency: env.DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT, - biases: { - concurrencyLimitBias: 0.0, - availableCapacityBias: 0.0, - queueAgeRandomization: 0.1, - }, - }), + defaultEnvConcurrency: env.DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT, + biases: { + concurrencyLimitBias: 0.0, + availableCapacityBias: 0.0, + queueAgeRandomization: 0.1, + }, }), workers: 1, redis, diff --git a/apps/webapp/app/v3/marqs/marqsKeyProducer.ts b/apps/webapp/app/v3/marqs/marqsKeyProducer.ts index a4fbfb6a62..673d180229 100644 --- a/apps/webapp/app/v3/marqs/marqsKeyProducer.ts +++ b/apps/webapp/app/v3/marqs/marqsKeyProducer.ts @@ -11,14 +11,12 @@ const constants = { CONCURRENCY_KEY_PART: "ck", MESSAGE_PART: "message", RESERVE_CONCURRENCY_PART: "reserveConcurrency", - PRIORITY_PART: "priority", } as const; const ORG_REGEX = /org:([^:]+):/; const ENV_REGEX = /env:([^:]+):/; const QUEUE_REGEX = /queue:([^:]+)(?::|$)/; const CONCURRENCY_KEY_REGEX = /ck:([^:]+)(?::|$)/; -const PRIORITY_REGEX = /priority:(\d+)(?::|$)/; export class MarQSShortKeyProducer implements MarQSKeyProducer { constructor(private _prefix: string) {} @@ -52,25 +50,13 @@ export class MarQSShortKeyProducer implements MarQSKeyProducer { ].join(":"); } - queueKey( - orgId: string, - envId: string, - queue: string, - concurrencyKey?: string, - priority?: number - ): string; - queueKey( - env: MarQSKeyProducerEnv, - queue: string, - concurrencyKey?: string, - priority?: number - ): string; + queueKey(orgId: string, envId: string, queue: string, concurrencyKey?: string): string; + queueKey(env: MarQSKeyProducerEnv, queue: string, concurrencyKey?: string): string; queueKey( envOrOrgId: MarQSKeyProducerEnv | string, queueOrEnvId: string, queueOrConcurrencyKey: string, - concurrencyKeyOrPriority?: string | number, - priority?: number + concurrencyKeyOrPriority?: string | number ): string { if (typeof envOrOrgId === "string") { return [ @@ -83,7 +69,6 @@ export class MarQSShortKeyProducer implements MarQSKeyProducer { ? this.concurrencyKeySection(concurrencyKeyOrPriority) : [] ) - .concat(typeof priority === "number" && priority ? this.prioritySection(priority) : []) .join(":"); } else { return [ @@ -92,24 +77,18 @@ export class MarQSShortKeyProducer implements MarQSKeyProducer { this.queueSection(queueOrEnvId), ] .concat(queueOrConcurrencyKey ? this.concurrencyKeySection(queueOrConcurrencyKey) : []) - .concat( - typeof concurrencyKeyOrPriority === "number" && concurrencyKeyOrPriority - ? this.prioritySection(concurrencyKeyOrPriority) - : [] - ) .join(":"); } } - queueKeyFromQueue(queue: string, priority?: number): string { + queueKeyFromQueue(queue: string): string { const descriptor = this.queueDescriptorFromQueue(queue); return this.queueKey( descriptor.organization, descriptor.environment, descriptor.name, - descriptor.concurrencyKey, - descriptor.priority ?? priority + descriptor.concurrencyKey ); } @@ -246,16 +225,11 @@ export class MarQSShortKeyProducer implements MarQSKeyProducer { const concurrencyKey = concurrencyKeyMatch ? concurrencyKeyMatch[1] : undefined; - const priorityMatch = queue.match(PRIORITY_REGEX); - - const priority = priorityMatch ? parseInt(priorityMatch[1], 10) : undefined; - return { name: queueName, environment: envId, organization: orgId, concurrencyKey, - priority, }; } @@ -280,10 +254,6 @@ export class MarQSShortKeyProducer implements MarQSKeyProducer { return `${constants.CONCURRENCY_KEY_PART}:${concurrencyKey}`; } - private prioritySection(priority: number) { - return `${constants.PRIORITY_PART}:${priority}`; - } - private currentConcurrencyKeyFromDescriptor(descriptor: QueueDescriptor) { return [ this.queueKey( diff --git a/apps/webapp/app/v3/marqs/types.ts b/apps/webapp/app/v3/marqs/types.ts index 284453c77f..98792a3099 100644 --- a/apps/webapp/app/v3/marqs/types.ts +++ b/apps/webapp/app/v3/marqs/types.ts @@ -8,7 +8,6 @@ export type QueueDescriptor = { environment: string; name: string; concurrencyKey?: string; - priority?: number; }; export type MarQSKeyProducerEnv = { @@ -28,21 +27,10 @@ export interface MarQSKeyProducer { envReserveConcurrencyKey(envId: string): string; - queueKey( - orgId: string, - envId: string, - queue: string, - concurrencyKey?: string, - priority?: number - ): string; - queueKey( - env: MarQSKeyProducerEnv, - queue: string, - concurrencyKey?: string, - priority?: number - ): string; + queueKey(orgId: string, envId: string, queue: string, concurrencyKey?: string): string; + queueKey(env: MarQSKeyProducerEnv, queue: string, concurrencyKey?: string): string; - queueKeyFromQueue(queue: string, priority?: number): string; + queueKeyFromQueue(queue: string): string; envQueueKey(env: MarQSKeyProducerEnv): string; envSharedQueueKey(env: MarQSKeyProducerEnv): string; @@ -75,6 +63,10 @@ export type EnvQueues = { queues: string[]; }; +const MarQSPriorityLevel = z.enum(["resume", "retry"]); + +export type MarQSPriorityLevel = z.infer; + export interface MarQSFairDequeueStrategy { distributeFairQueuesFromParentQueue( parentQueue: string, @@ -90,6 +82,9 @@ export const MessagePayload = z.object({ timestamp: z.number(), parentQueue: z.string(), concurrencyKey: z.string().optional(), + priority: MarQSPriorityLevel.optional(), + availableAt: z.number().optional(), + enqueueMethod: z.enum(["enqueue", "requeue", "replace"]).default("enqueue"), }); export type MessagePayload = z.infer; @@ -100,10 +95,11 @@ export interface MessageQueueSubscriber { messageAcked(message: MessagePayload): Promise; messageNacked(message: MessagePayload): Promise; messageReplaced(message: MessagePayload): Promise; - messageRequeued(oldQueue: string, message: MessagePayload): Promise; + messageRequeued(message: MessagePayload): Promise; } export interface VisibilityTimeoutStrategy { + startHeartbeat(messageId: string, timeoutInMs: number): Promise; heartbeat(messageId: string, timeoutInMs: number): Promise; cancelHeartbeat(messageId: string): Promise; } diff --git a/apps/webapp/app/v3/marqs/v2.server.ts b/apps/webapp/app/v3/marqs/v2.server.ts index 3492f42c57..9421ce5fad 100644 --- a/apps/webapp/app/v3/marqs/v2.server.ts +++ b/apps/webapp/app/v3/marqs/v2.server.ts @@ -20,6 +20,10 @@ const KEY_PREFIX = "marqsv2:"; const SHARED_QUEUE_NAME = "sharedQueue"; export class V2VisibilityTimeout implements VisibilityTimeoutStrategy { + async startHeartbeat(messageId: string, timeoutInMs: number): Promise { + RequeueV2Message.enqueue(messageId, new Date(Date.now() + timeoutInMs)); + } + async heartbeat(messageId: string, timeoutInMs: number): Promise { RequeueV2Message.enqueue(messageId, new Date(Date.now() + timeoutInMs)); } diff --git a/apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts b/apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts index 7611ee2bce..82ebe90b5f 100644 --- a/apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts +++ b/apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts @@ -3,6 +3,10 @@ import { TaskRunHeartbeatFailedService } from "../taskRunHeartbeatFailed.server" import { VisibilityTimeoutStrategy } from "./types"; export class V3GraphileVisibilityTimeout implements VisibilityTimeoutStrategy { + async startHeartbeat(messageId: string, timeoutInMs: number): Promise { + await TaskRunHeartbeatFailedService.enqueue(messageId, new Date(Date.now() + timeoutInMs)); + } + async heartbeat(messageId: string, timeoutInMs: number): Promise { await TaskRunHeartbeatFailedService.enqueue(messageId, new Date(Date.now() + timeoutInMs)); } @@ -13,7 +17,7 @@ export class V3GraphileVisibilityTimeout implements VisibilityTimeoutStrategy { } export class V3LegacyRunEngineWorkerVisibilityTimeout implements VisibilityTimeoutStrategy { - async heartbeat(messageId: string, timeoutInMs: number): Promise { + async startHeartbeat(messageId: string, timeoutInMs: number): Promise { await legacyRunEngineWorker.enqueue({ id: `heartbeat:${messageId}`, job: "runHeartbeat", @@ -22,6 +26,13 @@ export class V3LegacyRunEngineWorkerVisibilityTimeout implements VisibilityTimeo }); } + async heartbeat(messageId: string, timeoutInMs: number): Promise { + await legacyRunEngineWorker.reschedule( + `heartbeat:${messageId}`, + new Date(Date.now() + timeoutInMs) + ); + } + async cancelHeartbeat(messageId: string): Promise { await legacyRunEngineWorker.ack(`heartbeat:${messageId}`); } diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index 635bb2b5e8..324750f1a8 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -22,7 +22,7 @@ import { env } from "~/env.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { safeJsonParse } from "~/utils/json"; -import { marqs, MarQSPriorityLevel } from "~/v3/marqs/index.server"; +import { marqs } from "~/v3/marqs/index.server"; import { createExceptionPropertiesFromError, eventRepository } from "../eventRepository.server"; import { FailedTaskRunRetryHelper } from "../failedTaskRun.server"; import { FAILED_RUN_STATUSES, isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus"; @@ -467,8 +467,7 @@ export class CompleteAttemptService extends BaseService { const retryViaQueue = () => { logger.debug("[CompleteAttemptService] Enqueuing retry attempt", { runId: run.id }); - // We have to replace a potential RESUME with EXECUTE to correctly retry the attempt - return marqs?.requeueMessage( + return marqs.requeueMessage( run.id, { type: "EXECUTE", @@ -477,7 +476,7 @@ export class CompleteAttemptService extends BaseService { retryCheckpointsDisabled: !this.opts.supportsRetryCheckpoints, }, executionRetry.timestamp, - MarQSPriorityLevel.retry + "retry" ); }; @@ -615,12 +614,7 @@ export class CompleteAttemptService extends BaseService { }); if (environment.type === "DEVELOPMENT") { - marqs.requeueMessage( - taskRunAttempt.taskRunId, - {}, - executionRetry.timestamp, - MarQSPriorityLevel.retry - ); + await marqs.requeueMessage(taskRunAttempt.taskRunId, {}, executionRetry.timestamp, "retry"); return "RETRIED"; } diff --git a/apps/webapp/app/v3/services/createCheckpoint.server.ts b/apps/webapp/app/v3/services/createCheckpoint.server.ts index 5b0367c22b..e95f6cedeb 100644 --- a/apps/webapp/app/v3/services/createCheckpoint.server.ts +++ b/apps/webapp/app/v3/services/createCheckpoint.server.ts @@ -2,7 +2,7 @@ import { CoordinatorToPlatformMessages, ManualCheckpointMetadata } from "@trigge import type { InferSocketMessageSchema } from "@trigger.dev/core/v3/zodSocket"; import type { Checkpoint, CheckpointRestoreEvent } from "@trigger.dev/database"; import { logger } from "~/services/logger.server"; -import { marqs, MarQSPriorityLevel } from "~/v3/marqs/index.server"; +import { marqs } from "~/v3/marqs/index.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; import { isFreezableAttemptStatus, isFreezableRunStatus } from "../taskStatus"; import { BaseService } from "./baseService.server"; @@ -266,7 +266,7 @@ export class CreateCheckpointService extends BaseService { checkpointEventId: checkpointEvent.id, }, restoreAtUnixTimeMs, - MarQSPriorityLevel.resume + "resume" ); return { diff --git a/apps/webapp/app/v3/services/resumeBatchRun.server.ts b/apps/webapp/app/v3/services/resumeBatchRun.server.ts index f9c03681ea..af81781547 100644 --- a/apps/webapp/app/v3/services/resumeBatchRun.server.ts +++ b/apps/webapp/app/v3/services/resumeBatchRun.server.ts @@ -1,6 +1,6 @@ import { PrismaClientOrTransaction } from "~/db.server"; import { workerQueue } from "~/services/worker.server"; -import { marqs, MarQSPriorityLevel } from "~/v3/marqs/index.server"; +import { marqs } from "~/v3/marqs/index.server"; import { BaseService } from "./baseService.server"; import { logger } from "~/services/logger.server"; import { BatchTaskRun } from "@trigger.dev/database"; @@ -188,8 +188,7 @@ export class ResumeBatchRunService extends BaseService { dependentTaskAttemptId: dependentTaskAttempt.id, }); - // TODO: use the new priority queue thingie - await marqs?.enqueueMessage( + await marqs.enqueueMessage( environment, dependentRun.queue, dependentRun.id, @@ -206,7 +205,7 @@ export class ResumeBatchRunService extends BaseService { dependentRun.concurrencyKey ?? undefined, dependentRun.queueTimestamp ?? dependentRun.createdAt, undefined, - MarQSPriorityLevel.resume + "resume" ); return "COMPLETED"; @@ -252,7 +251,7 @@ export class ResumeBatchRunService extends BaseService { hasCheckpointEvent: !!batchRun.checkpointEventId, }); - await marqs?.requeueMessage( + await marqs.requeueMessage( dependentRun.id, { type: "RESUME", @@ -269,7 +268,7 @@ export class ResumeBatchRunService extends BaseService { ( dependentTaskAttempt.taskRun.queueTimestamp ?? dependentTaskAttempt.taskRun.createdAt ).getTime(), - MarQSPriorityLevel.resume + "resume" ); return "COMPLETED"; diff --git a/apps/webapp/app/v3/services/resumeTaskDependency.server.ts b/apps/webapp/app/v3/services/resumeTaskDependency.server.ts index de99b526c4..b4ccc2368a 100644 --- a/apps/webapp/app/v3/services/resumeTaskDependency.server.ts +++ b/apps/webapp/app/v3/services/resumeTaskDependency.server.ts @@ -1,8 +1,8 @@ import { PrismaClientOrTransaction } from "~/db.server"; +import { logger } from "~/services/logger.server"; import { workerQueue } from "~/services/worker.server"; -import { MarQS, marqs, MarQSPriorityLevel } from "~/v3/marqs/index.server"; +import { marqs } from "~/v3/marqs/index.server"; import { BaseService } from "./baseService.server"; -import { logger } from "~/services/logger.server"; import { TaskRunDependency } from "@trigger.dev/database"; export class ResumeTaskDependencyService extends BaseService { @@ -82,7 +82,7 @@ export class ResumeTaskDependencyService extends BaseService { dependentRun.concurrencyKey ?? undefined, dependentRun.queueTimestamp ?? dependentRun.createdAt, undefined, - MarQSPriorityLevel.resume + "resume" ); } else { logger.debug("Task dependency resume: Attempt is not paused or there's no checkpoint event", { @@ -116,7 +116,7 @@ export class ResumeTaskDependencyService extends BaseService { return; } - await marqs?.requeueMessage( + await marqs.requeueMessage( dependentRun.id, { type: "RESUME", @@ -129,7 +129,7 @@ export class ResumeTaskDependencyService extends BaseService { environmentType: dependency.taskRun.runtimeEnvironment.type, }, (dependentRun.queueTimestamp ?? dependentRun.createdAt).getTime(), - MarQSPriorityLevel.resume + "resume" ); } } diff --git a/apps/webapp/app/v3/services/taskRunConcurrencyTracker.server.ts b/apps/webapp/app/v3/services/taskRunConcurrencyTracker.server.ts index 4f3b0400ad..6b9907bbab 100644 --- a/apps/webapp/app/v3/services/taskRunConcurrencyTracker.server.ts +++ b/apps/webapp/app/v3/services/taskRunConcurrencyTracker.server.ts @@ -121,11 +121,10 @@ class TaskRunConcurrencyTracker implements MessageQueueSubscriber { }); } - async messageRequeued(oldQueue: string, message: MessagePayload): Promise { + async messageRequeued(message: MessagePayload): Promise { logger.debug("TaskRunConcurrencyTracker.messageRequeued()", { data: message.data, messageId: message.messageId, - oldQueue, }); const data = this.getMessageData(message); diff --git a/apps/webapp/test/envPriorityDequeueingStrategy.test.ts b/apps/webapp/test/envPriorityDequeueingStrategy.test.ts deleted file mode 100644 index 8335136923..0000000000 --- a/apps/webapp/test/envPriorityDequeueingStrategy.test.ts +++ /dev/null @@ -1,388 +0,0 @@ -import { describe, expect, it } from "vitest"; -import type { EnvQueues, MarQSFairDequeueStrategy } from "~/v3/marqs/types.js"; -import { EnvPriorityDequeuingStrategy } from "../app/v3/marqs/envPriorityDequeuingStrategy.server.js"; -import { createKeyProducer } from "./utils/marqs.js"; - -const keyProducer = createKeyProducer("test"); - -describe("EnvPriorityDequeuingStrategy", () => { - class TestDelegate implements MarQSFairDequeueStrategy { - constructor(private queues: EnvQueues[]) {} - - async distributeFairQueuesFromParentQueue(): Promise> { - return this.queues; - } - } - - describe("distributeFairQueuesFromParentQueue", () => { - it("should preserve order when all queues have the same priority", async () => { - const inputQueues: EnvQueues[] = [ - { - envId: "env1", - queues: [ - "org:org1:env:env1:queue:queue1:priority:1", - "org:org1:env:env1:queue:queue2:priority:1", - "org:org1:env:env1:queue:queue3:priority:1", - ], - }, - ]; - - const delegate = new TestDelegate(inputQueues); - const strategy = new EnvPriorityDequeuingStrategy({ - delegate, - keys: keyProducer, - }); - - const result = await strategy.distributeFairQueuesFromParentQueue("parentQueue", "consumer1"); - - expect(result).toEqual(inputQueues); - expect(result[0].queues).toEqual(inputQueues[0].queues); - }); - - it("should sort queues by priority in descending order", async () => { - const inputQueues: EnvQueues[] = [ - { - envId: "env1", - queues: [ - "org:org1:env:env1:queue:queue1:priority:1", - "org:org1:env:env1:queue:queue2:priority:3", - "org:org1:env:env1:queue:queue3:priority:2", - ], - }, - ]; - - const delegate = new TestDelegate(inputQueues); - const strategy = new EnvPriorityDequeuingStrategy({ - delegate, - keys: keyProducer, - }); - - const result = await strategy.distributeFairQueuesFromParentQueue("parentQueue", "consumer1"); - - expect(result[0].queues).toEqual([ - "org:org1:env:env1:queue:queue2:priority:3", - "org:org1:env:env1:queue:queue3:priority:2", - "org:org1:env:env1:queue:queue1:priority:1", - ]); - }); - - it("should handle queues without priority by treating them as priority 0", async () => { - const inputQueues: EnvQueues[] = [ - { - envId: "env1", - queues: [ - "org:org1:env:env1:queue:queue1", - "org:org1:env:env1:queue:queue2:priority:2", - "org:org1:env:env1:queue:queue3", - ], - }, - ]; - - const delegate = new TestDelegate(inputQueues); - const strategy = new EnvPriorityDequeuingStrategy({ - delegate, - keys: keyProducer, - }); - - const result = await strategy.distributeFairQueuesFromParentQueue("parentQueue", "consumer1"); - - expect(result[0].queues).toEqual([ - "org:org1:env:env1:queue:queue2:priority:2", - "org:org1:env:env1:queue:queue1", - "org:org1:env:env1:queue:queue3", - ]); - }); - - it("should handle multiple environments", async () => { - const inputQueues: EnvQueues[] = [ - { - envId: "env1", - queues: [ - "org:org1:env:env1:queue:queue1:priority:1", - "org:org1:env:env1:queue:queue2:priority:2", - ], - }, - { - envId: "env2", - queues: [ - "org:org1:env:env2:queue:queue3:priority:3", - "org:org1:env:env2:queue:queue4:priority:1", - ], - }, - ]; - - const delegate = new TestDelegate(inputQueues); - const strategy = new EnvPriorityDequeuingStrategy({ - delegate, - keys: keyProducer, - }); - - const result = await strategy.distributeFairQueuesFromParentQueue("parentQueue", "consumer1"); - - expect(result).toHaveLength(2); - expect(result[0].queues).toEqual([ - "org:org1:env:env1:queue:queue2:priority:2", - "org:org1:env:env1:queue:queue1:priority:1", - ]); - expect(result[1].queues).toEqual([ - "org:org1:env:env2:queue:queue3:priority:3", - "org:org1:env:env2:queue:queue4:priority:1", - ]); - }); - - it("should handle negative priorities correctly", async () => { - const inputQueues: EnvQueues[] = [ - { - envId: "env1", - queues: [ - "org:org1:env:env1:queue:queue1:priority:-1", - "org:org1:env:env1:queue:queue2:priority:1", - "org:org1:env:env1:queue:queue3:priority:-2", - ], - }, - ]; - - const delegate = new TestDelegate(inputQueues); - const strategy = new EnvPriorityDequeuingStrategy({ - delegate, - keys: keyProducer, - }); - - const result = await strategy.distributeFairQueuesFromParentQueue("parentQueue", "consumer1"); - - expect(result[0].queues).toEqual([ - "org:org1:env:env1:queue:queue2:priority:1", - "org:org1:env:env1:queue:queue1:priority:-1", - "org:org1:env:env1:queue:queue3:priority:-2", - ]); - }); - - it("should maintain stable sort for mixed priority and non-priority queues", async () => { - const inputQueues: EnvQueues[] = [ - { - envId: "env1", - queues: [ - "org:org1:env:env1:queue:queue1", - "org:org1:env:env1:queue:queue2:priority:1", - "org:org1:env:env1:queue:queue3", - "org:org1:env:env1:queue:queue4:priority:1", - "org:org1:env:env1:queue:queue5", - ], - }, - ]; - - const delegate = new TestDelegate(inputQueues); - const strategy = new EnvPriorityDequeuingStrategy({ - delegate, - keys: keyProducer, - }); - - const result = await strategy.distributeFairQueuesFromParentQueue("parentQueue", "consumer1"); - - // Check that queue2 and queue4 (priority 1) maintain their relative order - // and queue1, queue3, and queue5 (priority 0) maintain their relative order - expect(result[0].queues).toEqual([ - "org:org1:env:env1:queue:queue2:priority:1", - "org:org1:env:env1:queue:queue4:priority:1", - "org:org1:env:env1:queue:queue1", - "org:org1:env:env1:queue:queue3", - "org:org1:env:env1:queue:queue5", - ]); - }); - - it("should handle empty queue arrays", async () => { - const inputQueues: EnvQueues[] = [ - { - envId: "env1", - queues: [], - }, - ]; - - const delegate = new TestDelegate(inputQueues); - const strategy = new EnvPriorityDequeuingStrategy({ - delegate, - keys: keyProducer, - }); - - const result = await strategy.distributeFairQueuesFromParentQueue("parentQueue", "consumer1"); - - expect(result).toEqual(inputQueues); - expect(result[0].queues).toEqual([]); - }); - - it("should handle empty environments array", async () => { - const inputQueues: EnvQueues[] = []; - - const delegate = new TestDelegate(inputQueues); - const strategy = new EnvPriorityDequeuingStrategy({ - delegate, - keys: keyProducer, - }); - - const result = await strategy.distributeFairQueuesFromParentQueue("parentQueue", "consumer1"); - - expect(result).toEqual([]); - }); - - it("should handle large priority differences", async () => { - const inputQueues: EnvQueues[] = [ - { - envId: "env1", - queues: [ - "org:org1:env:env1:queue:queue1:priority:1", - "org:org1:env:env1:queue:queue2:priority:1000", - "org:org1:env:env1:queue:queue3:priority:500", - ], - }, - ]; - - const delegate = new TestDelegate(inputQueues); - const strategy = new EnvPriorityDequeuingStrategy({ - delegate, - keys: keyProducer, - }); - - const result = await strategy.distributeFairQueuesFromParentQueue("parentQueue", "consumer1"); - - expect(result[0].queues).toEqual([ - "org:org1:env:env1:queue:queue2:priority:1000", - "org:org1:env:env1:queue:queue3:priority:500", - "org:org1:env:env1:queue:queue1:priority:1", - ]); - }); - - it("should handle multiple environments with mixed priority patterns", async () => { - const inputQueues: EnvQueues[] = [ - { - envId: "env1", - queues: [ - "org:org1:env:env1:queue:queue1", // priority 0 - "org:org1:env:env1:queue:queue2:priority:2", - ], - }, - { - envId: "env2", - queues: [ - "org:org1:env:env2:queue:queue3:priority:1", - "org:org1:env:env2:queue:queue4", // priority 0 - ], - }, - { - envId: "env3", - queues: [ - "org:org1:env:env3:queue:queue5:priority:1", - "org:org1:env:env3:queue:queue6:priority:1", - ], - }, - ]; - - const delegate = new TestDelegate(inputQueues); - const strategy = new EnvPriorityDequeuingStrategy({ - delegate, - keys: keyProducer, - }); - - const result = await strategy.distributeFairQueuesFromParentQueue("parentQueue", "consumer1"); - - expect(result).toHaveLength(3); - expect(result[0].queues).toEqual([ - "org:org1:env:env1:queue:queue2:priority:2", - "org:org1:env:env1:queue:queue1", - ]); - expect(result[1].queues).toEqual([ - "org:org1:env:env2:queue:queue3:priority:1", - "org:org1:env:env2:queue:queue4", - ]); - expect(result[2].queues).toEqual([ - "org:org1:env:env3:queue:queue5:priority:1", - "org:org1:env:env3:queue:queue6:priority:1", - ]); - }); - - it("should sort queues with concurrency keys while maintaining priority order", async () => { - const inputQueues: EnvQueues[] = [ - { - envId: "env1", - queues: [ - "org:org1:env:env1:queue:queue1:ck:key1:priority:1", - "org:org1:env:env1:queue:queue2:ck:key1:priority:3", - "org:org1:env:env1:queue:queue3:ck:key2:priority:2", - ], - }, - ]; - - const delegate = new TestDelegate(inputQueues); - const strategy = new EnvPriorityDequeuingStrategy({ - delegate, - keys: keyProducer, - }); - - const result = await strategy.distributeFairQueuesFromParentQueue("parentQueue", "consumer1"); - - expect(result[0].queues).toEqual([ - "org:org1:env:env1:queue:queue2:ck:key1:priority:3", - "org:org1:env:env1:queue:queue3:ck:key2:priority:2", - "org:org1:env:env1:queue:queue1:ck:key1:priority:1", - ]); - }); - - it("should handle mixed queues with and without concurrency keys", async () => { - const inputQueues: EnvQueues[] = [ - { - envId: "env1", - queues: [ - "org:org1:env:env1:queue:queue1:priority:1", - "org:org1:env:env1:queue:queue2:ck:shared-key:priority:2", - "org:org1:env:env1:queue:queue3:ck:shared-key:priority:1", - "org:org1:env:env1:queue:queue4:priority:3", - "org:org1:env:env1:queue:queue5:ck:other-key:priority:2", - ], - }, - ]; - - const delegate = new TestDelegate(inputQueues); - const strategy = new EnvPriorityDequeuingStrategy({ - delegate, - keys: keyProducer, - }); - - const result = await strategy.distributeFairQueuesFromParentQueue("parentQueue", "consumer1"); - - expect(result[0].queues).toEqual([ - "org:org1:env:env1:queue:queue4:priority:3", - "org:org1:env:env1:queue:queue2:ck:shared-key:priority:2", - "org:org1:env:env1:queue:queue5:ck:other-key:priority:2", - "org:org1:env:env1:queue:queue1:priority:1", - "org:org1:env:env1:queue:queue3:ck:shared-key:priority:1", - ]); - }); - - it("should only return the highest priority queue of the same queue", async () => { - const inputQueues: EnvQueues[] = [ - { - envId: "env1", - queues: [ - "org:org1:env:env1:queue:queue1", - "org:org1:env:env1:queue:queue1:priority:1", - "org:org1:env:env1:queue:queue1:priority:2", - "org:org1:env:env1:queue:queue1:priority:3", - "org:org1:env:env1:queue:queue2", - ], - }, - ]; - - const delegate = new TestDelegate(inputQueues); - const strategy = new EnvPriorityDequeuingStrategy({ - delegate, - keys: keyProducer, - }); - - const result = await strategy.distributeFairQueuesFromParentQueue("parentQueue", "consumer1"); - - expect(result[0].queues).toEqual([ - "org:org1:env:env1:queue:queue1:priority:3", - "org:org1:env:env1:queue:queue2", - ]); - }); - }); -}); diff --git a/apps/webapp/test/fairDequeuingStrategy.test.ts b/apps/webapp/test/fairDequeuingStrategy.test.ts index 5d0b8e4af8..94f9f4a3e7 100644 --- a/apps/webapp/test/fairDequeuingStrategy.test.ts +++ b/apps/webapp/test/fairDequeuingStrategy.test.ts @@ -9,6 +9,7 @@ import { } from "./utils/marqs.js"; import { trace } from "@opentelemetry/api"; import { EnvQueues } from "~/v3/marqs/types.js"; +import { MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET } from "~/v3/marqs/constants.server.js"; const tracer = trace.getTracer("test"); @@ -870,6 +871,135 @@ describe("FairDequeuingStrategy", () => { expect(selectionPercentages["env-4"] || 0).toBeLessThan(20); } ); + + redisTest( + "should not overly bias picking environments when queue have priority offset ages", + async ({ redis }) => { + const keyProducer = createKeyProducer("test"); + const strategy = new FairDequeuingStrategy({ + tracer, + redis, + keys: keyProducer, + defaultEnvConcurrency: 5, + parentQueueLimit: 100, + seed: "test-seed-max-orgs", + maximumEnvCount: 2, // Only select top 2 orgs + }); + + const now = Date.now(); + + // Setup 4 envs with different queue age profiles + const envSetups = [ + { + envId: "env-1", + queues: [ + { age: 1000 }, // Average age: 1000 + ], + }, + { + envId: "env-2", + queues: [ + { age: 5000 + MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET }, // Average age: 5000 + 1 year + { age: 5000 + MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET }, + ], + }, + { + envId: "env-3", + queues: [ + { age: 2000 }, // Average age: 2000 + { age: 2000 }, + ], + }, + { + envId: "env-4", + queues: [ + { age: 500 }, // Average age: 500 + { age: 500 }, + ], + }, + ]; + + // Setup queues and concurrency for each org + for (const setup of envSetups) { + await setupConcurrency({ + redis, + keyProducer, + env: { id: setup.envId, currentConcurrency: 0, limit: 5 }, + }); + + for (let i = 0; i < setup.queues.length; i++) { + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - setup.queues[i].age, + queueId: `queue-${setup.envId}-${i}`, + orgId: `org-${setup.envId}`, + envId: setup.envId, + }); + } + } + + // Run multiple iterations to verify consistent behavior + const iterations = 100; + const selectedEnvCounts: Record = {}; + + for (let i = 0; i < iterations; i++) { + const envResult = await strategy.distributeFairQueuesFromParentQueue( + "parent-queue", + `consumer-${i}` + ); + const result = flattenResults(envResult); + + // Track which orgs were included in the result + const selectedEnvs = new Set(result.map((queueId) => keyProducer.envIdFromQueue(queueId))); + + // Verify we never get more than maximumOrgCount orgs + expect(selectedEnvs.size).toBeLessThanOrEqual(2); + + for (const envId of selectedEnvs) { + selectedEnvCounts[envId] = (selectedEnvCounts[envId] || 0) + 1; + } + } + + console.log("Environment selection counts:", selectedEnvCounts); + + // org-2 should be selected most often (highest average age) + expect(selectedEnvCounts["env-2"]).toBeGreaterThan(selectedEnvCounts["env-4"] || 0); + + // org-4 should be selected least often (lowest average age) + const env4Count = selectedEnvCounts["env-4"] || 0; + expect(env4Count).toBeLessThan(selectedEnvCounts["env-2"]); + + // Verify that envs with higher average queue age are selected more frequently + const sortedEnvs = Object.entries(selectedEnvCounts).sort((a, b) => b[1] - a[1]); + console.log("Sorted environment frequencies:", sortedEnvs); + + // The top 2 most frequently selected orgs should be env-2 and env-3 + // as they have the highest average queue ages + const topTwoEnvs = new Set([sortedEnvs[0][0], sortedEnvs[1][0]]); + expect(topTwoEnvs).toContain("env-2"); // Highest average age + expect(topTwoEnvs).toContain("env-3"); // Second highest average age + + // Calculate selection percentages + const totalSelections = Object.values(selectedEnvCounts).reduce((a, b) => a + b, 0); + const selectionPercentages = Object.entries(selectedEnvCounts).reduce( + (acc, [orgId, count]) => { + acc[orgId] = (count / totalSelections) * 100; + return acc; + }, + {} as Record + ); + + console.log("Environment selection percentages:", selectionPercentages); + + // Verify that env-2 (highest average age) gets selected in at least 40% of iterations + expect(selectionPercentages["env-2"]).toBeGreaterThan(40); + + // Verify that env-4 (lowest average age) gets selected in less than 20% of iterations + expect(selectionPercentages["env-4"] || 0).toBeLessThan(20); + } + ); }); // Helper function to flatten results for counting diff --git a/apps/webapp/test/marqsKeyProducer.test.ts b/apps/webapp/test/marqsKeyProducer.test.ts index 1f6af24545..59930701f3 100644 --- a/apps/webapp/test/marqsKeyProducer.test.ts +++ b/apps/webapp/test/marqsKeyProducer.test.ts @@ -61,24 +61,6 @@ describe("MarQSShortKeyProducer", () => { "org:765432109876:env:345678901234:queue:testQueue:ck:concKey" ); }); - - it("should include priority when provided", () => { - expect(producer.queueKey(sampleEnv, "testQueue", undefined, 1)).toBe( - "org:765432109876:env:345678901234:queue:testQueue:priority:1" - ); - }); - - it("should NOT include priority when provided with 0", () => { - expect(producer.queueKey(sampleEnv, "testQueue", undefined, 0)).toBe( - "org:765432109876:env:345678901234:queue:testQueue" - ); - }); - - it("should include priority when provided with overloaded call", () => { - expect( - producer.queueKey(sampleEnv.organizationId, sampleEnv.id, "testQueue", undefined, 1) - ).toBe("org:765432109876:env:345678901234:queue:testQueue:priority:1"); - }); }); describe("queueKeyFromQueue", () => { @@ -93,27 +75,6 @@ describe("MarQSShortKeyProducer", () => { producer.queueKeyFromQueue("org:765432109876:env:345678901234:queue:testQueue:ck:concKey") ).toBe("org:765432109876:env:345678901234:queue:testQueue:ck:concKey"); }); - - it("should include priority when provided", () => { - expect( - producer.queueKeyFromQueue("org:765432109876:env:345678901234:queue:testQueue", 1) - ).toBe("org:765432109876:env:345678901234:queue:testQueue:priority:1"); - }); - - it("should NOT include priority when provided with 0", () => { - expect( - producer.queueKeyFromQueue("org:765432109876:env:345678901234:queue:testQueue", 0) - ).toBe("org:765432109876:env:345678901234:queue:testQueue"); - }); - - it("should NOT change the priority when provided", () => { - expect( - producer.queueKeyFromQueue( - "org:765432109876:env:345678901234:queue:testQueue:priority:1", - 10 - ) - ).toBe("org:765432109876:env:345678901234:queue:testQueue:priority:1"); - }); }); describe("envSharedQueueKey", () => { @@ -130,7 +91,7 @@ describe("MarQSShortKeyProducer", () => { describe("queueDescriptorFromQueue", () => { it("should parse queue string into descriptor", () => { - const queueString = "org:123:env:456:queue:testQueue:ck:concKey:priority:5"; + const queueString = "org:123:env:456:queue:testQueue:ck:concKey"; const descriptor = producer.queueDescriptorFromQueue(queueString); expect(descriptor).toEqual({ @@ -138,7 +99,6 @@ describe("MarQSShortKeyProducer", () => { environment: "456", organization: "123", concurrencyKey: "concKey", - priority: 5, }); }); @@ -151,7 +111,6 @@ describe("MarQSShortKeyProducer", () => { environment: "456", organization: "123", concurrencyKey: undefined, - priority: undefined, }); }); @@ -203,22 +162,6 @@ describe("MarQSShortKeyProducer", () => { ) ).toBe("org:765432109876:env:345678901234:queue:testQueue:ck:concKey:currentConcurrency"); }); - - it("should remove the priority bit when provided", () => { - expect( - producer.queueCurrentConcurrencyKeyFromQueue( - "org:765432109876:env:345678901234:queue:testQueue:priority:1" - ) - ).toBe("org:765432109876:env:345678901234:queue:testQueue:currentConcurrency"); - }); - - it("should remove the priority bit when provided, but keep the concurrency key", () => { - expect( - producer.queueCurrentConcurrencyKeyFromQueue( - "org:765432109876:env:345678901234:queue:testQueue:ck:concKey:priority:1" - ) - ).toBe("org:765432109876:env:345678901234:queue:testQueue:ck:concKey:currentConcurrency"); - }); }); describe("queueReserveConcurrencyKeyFromQueue", () => { @@ -237,22 +180,6 @@ describe("MarQSShortKeyProducer", () => { ) ).toBe("org:765432109876:env:345678901234:queue:testQueue:reserveConcurrency"); }); - - it("should remove the priority bit when provided", () => { - expect( - producer.queueReserveConcurrencyKeyFromQueue( - "org:765432109876:env:345678901234:queue:testQueue:priority:1" - ) - ).toBe("org:765432109876:env:345678901234:queue:testQueue:reserveConcurrency"); - }); - - it("should remove the priority bit when provided, AND remove the concurrency key", () => { - expect( - producer.queueReserveConcurrencyKeyFromQueue( - "org:765432109876:env:345678901234:queue:testQueue:ck:concKey:priority:1" - ) - ).toBe("org:765432109876:env:345678901234:queue:testQueue:reserveConcurrency"); - }); }); describe("queueConcurrencyLimitKeyFromQueue", () => { @@ -271,22 +198,6 @@ describe("MarQSShortKeyProducer", () => { ) ).toBe("org:765432109876:env:345678901234:queue:testQueue:concurrency"); }); - - it("should remove the priority bit when provided", () => { - expect( - producer.queueConcurrencyLimitKeyFromQueue( - "org:765432109876:env:345678901234:queue:testQueue:priority:1" - ) - ).toBe("org:765432109876:env:345678901234:queue:testQueue:concurrency"); - }); - - it("should remove the priority bit when provided, AND remove the concurrency key", () => { - expect( - producer.queueConcurrencyLimitKeyFromQueue( - "org:765432109876:env:345678901234:queue:testQueue:ck:concKey:priority:1" - ) - ).toBe("org:765432109876:env:345678901234:queue:testQueue:concurrency"); - }); }); describe("envCurrentConcurrencyKey", () => { diff --git a/internal-packages/redis-worker/src/queue.ts b/internal-packages/redis-worker/src/queue.ts index 94309aaaa7..bb7225f396 100644 --- a/internal-packages/redis-worker/src/queue.ts +++ b/internal-packages/redis-worker/src/queue.ts @@ -213,6 +213,10 @@ export class SimpleQueue { } } + async reschedule(id: string, availableAt: Date): Promise { + await this.redis.zadd(`queue`, "XX", availableAt.getTime(), id); + } + async size({ includeFuture = false }: { includeFuture?: boolean } = {}): Promise { try { if (includeFuture) { diff --git a/internal-packages/redis-worker/src/worker.ts b/internal-packages/redis-worker/src/worker.ts index e80b63bc25..899662bb63 100644 --- a/internal-packages/redis-worker/src/worker.ts +++ b/internal-packages/redis-worker/src/worker.ts @@ -163,6 +163,26 @@ class Worker { ); } + /** + * Reschedules an existing job to a new available date. + * If the job isn't in the queue, it will be ignored. + */ + reschedule(id: string, availableAt: Date) { + return startSpan( + this.tracer, + "reschedule", + async (span) => { + return this.queue.reschedule(id, availableAt); + }, + { + kind: SpanKind.PRODUCER, + attributes: { + job_id: id, + }, + } + ); + } + ack(id: string) { return startSpan( this.tracer, diff --git a/references/test-tasks/src/trigger/helpers.ts b/references/test-tasks/src/trigger/helpers.ts new file mode 100644 index 0000000000..ed3d8e7686 --- /dev/null +++ b/references/test-tasks/src/trigger/helpers.ts @@ -0,0 +1,203 @@ +import { BatchResult, queue, task, wait } from "@trigger.dev/sdk/v3"; + +export const recursiveTask = task({ + id: "recursive-task", + queue: { + concurrencyLimit: 1, + }, + retry: { + maxAttempts: 1, + }, + run: async ( + { delayMs, depth, useBatch = false }: { delayMs: number; depth: number; useBatch: boolean }, + { ctx } + ) => { + if (depth === 0) { + return; + } + + await new Promise((resolve) => setTimeout(resolve, delayMs)); + + if (useBatch) { + const batchResult = await recursiveTask.batchTriggerAndWait([ + { + payload: { delayMs, depth: depth - 1, useBatch }, + options: { tags: ["recursive"] }, + }, + ]); + + const firstRun = batchResult.runs[0] as any; + + return { + ok: firstRun.ok, + }; + } else { + const result = (await recursiveTask.triggerAndWait({ + delayMs, + depth: depth - 1, + useBatch, + })) as any; + + return { + ok: result.ok, + }; + } + }, +}); + +export const singleQueue = queue({ + name: "single-queue", + concurrencyLimit: 1, +}); + +export const delayTask = task({ + id: "delay-task", + retry: { + maxAttempts: 1, + }, + run: async (payload: { delayMs: number }, { ctx }) => { + await new Promise((resolve) => setTimeout(resolve, payload.delayMs)); + }, +}); + +export const retryTask = task({ + id: "retry-task", + queue: singleQueue, + retry: { + maxAttempts: 2, + }, + run: async ( + payload: { delayMs: number; throwError: boolean; failureCount: number; retryDelayMs?: number }, + { ctx } + ) => { + await new Promise((resolve) => setTimeout(resolve, payload.delayMs)); + + if (payload.throwError && ctx.attempt.number <= payload.failureCount) { + throw new Error("Error"); + } + }, + handleError: async (payload, error, { ctx }) => { + if (!payload.throwError) { + return { + skipRetrying: true, + }; + } else { + return { + retryDelayInMs: payload.retryDelayMs, + }; + } + }, +}); + +export const durationWaitTask = task({ + id: "duration-wait-task", + queue: { + concurrencyLimit: 1, + }, + run: async ( + { + waitDurationInSeconds = 5, + doWait = true, + }: { waitDurationInSeconds: number; doWait: boolean }, + { ctx } + ) => { + if (doWait) { + await wait.for({ seconds: waitDurationInSeconds }); + } else { + await new Promise((resolve) => setTimeout(resolve, waitDurationInSeconds * 1000)); + } + }, +}); + +export const resumeParentTask = task({ + id: "resume-parent-task", + queue: { + concurrencyLimit: 1, + }, + run: async ( + { + delayMs = 5_000, + triggerChildTask, + useBatch = false, + }: { delayMs: number; triggerChildTask: boolean; useBatch: boolean }, + { ctx } + ) => { + if (triggerChildTask) { + if (useBatch) { + const batchResult = await resumeChildTask.batchTriggerAndWait([ + { + payload: { delayMs }, + options: { tags: ["resume-child"] }, + }, + ]); + + unwrapBatchResult(batchResult); + } else { + await resumeChildTask.triggerAndWait({ delayMs }, { tags: ["resume-child"] }).unwrap(); + } + } else { + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } + }, +}); + +export const resumeChildTask = task({ + id: "resume-child-task", + run: async (payload: { delayMs: number }, { ctx }) => { + await new Promise((resolve) => setTimeout(resolve, payload.delayMs)); + }, +}); + +export const genericParentTask = task({ + id: "generic-parent-task", + run: async ( + { + delayMs = 5_000, + triggerChildTask, + useBatch = false, + }: { delayMs: number; triggerChildTask: boolean; useBatch: boolean }, + { ctx } + ) => { + if (triggerChildTask) { + if (useBatch) { + const batchResult = await genericChildTask.batchTriggerAndWait([ + { + payload: { delayMs }, + options: { tags: ["resume-child"] }, + }, + ]); + + return unwrapBatchResult(batchResult); + } else { + await genericChildTask.triggerAndWait({ delayMs }, { tags: ["resume-child"] }).unwrap(); + } + } else { + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } + }, +}); + +function unwrapBatchResult(batchResult: BatchResult) { + if (batchResult.runs.some((run) => !run.ok)) { + throw new Error(`Child task failed: ${batchResult.runs.find((run) => !run.ok)?.error}`); + } + + return batchResult.runs; +} + +export const genericChildTask = task({ + id: "generic-child-task", + run: async (payload: { delayMs: number }, { ctx }) => { + await new Promise((resolve) => setTimeout(resolve, payload.delayMs)); + }, +}); + +export const eventLoopLagTask = task({ + id: "event-loop-lag-task", + run: async ({ delayMs }: { delayMs: number }, { ctx }) => { + const start = Date.now(); + while (Date.now() - start < delayMs) { + // Do nothing + } + }, +}); diff --git a/references/test-tasks/src/trigger/test-heartbeats.ts b/references/test-tasks/src/trigger/test-heartbeats.ts new file mode 100644 index 0000000000..c62dcc438e --- /dev/null +++ b/references/test-tasks/src/trigger/test-heartbeats.ts @@ -0,0 +1,47 @@ +import { waitForRunStatus } from "@/utils.js"; +import { logger, task } from "@trigger.dev/sdk/v3"; +import assert from "assert"; +import { genericChildTask } from "./helpers.js"; + +export const describeHeartbeats = task({ + id: "describe/heartbeats", + retry: { + maxAttempts: 1, + }, + run: async ( + { visibilityTimeoutSeconds = 100 }: { visibilityTimeoutSeconds?: number }, + { ctx } + ) => { + await testHeartbeats.triggerAndWait({ visibilityTimeoutSeconds }).unwrap(); + }, +}); + +export const testHeartbeats = task({ + id: "test/heartbeats", + retry: { + maxAttempts: 1, + }, + run: async ( + { visibilityTimeoutSeconds = 100 }: { visibilityTimeoutSeconds?: number }, + { ctx } + ) => { + const run = await genericChildTask.trigger({ + delayMs: visibilityTimeoutSeconds * 1_000 + 5 * 1000, + }); + + await waitForRunStatus(run.id, ["EXECUTING"]); + + logger.info("Heartbeat test: run is executing"); + + const completedRun = await waitForRunStatus( + run.id, + ["COMPLETED", "FAILED", "SYSTEM_FAILURE", "CRASHED"], + visibilityTimeoutSeconds + 30, + 5_000 + ); + + assert(completedRun.status === "COMPLETED", `Run failed with status ${completedRun.status}`); + + logger.info("Heartbeat test: run is completed"); + }, +}); diff --git a/references/test-tasks/src/trigger/test-reserve-concurrency-system.ts b/references/test-tasks/src/trigger/test-reserve-concurrency-system.ts index bf33753e8d..05b8eba9a4 100644 --- a/references/test-tasks/src/trigger/test-reserve-concurrency-system.ts +++ b/references/test-tasks/src/trigger/test-reserve-concurrency-system.ts @@ -1,10 +1,18 @@ -import { BatchResult, logger, queue, task, wait } from "@trigger.dev/sdk/v3"; +import { logger, task } from "@trigger.dev/sdk/v3"; import assert from "assert"; import { + getEnvironmentStats, updateEnvironmentConcurrencyLimit, waitForRunStatus, - getEnvironmentStats, } from "../utils.js"; +import { + retryTask, + resumeParentTask, + durationWaitTask, + delayTask, + genericParentTask, + recursiveTask, +} from "./helpers.js"; export const describeReserveConcurrencySystem = task({ id: "describe/reserve-concurrency-system", @@ -12,7 +20,7 @@ export const describeReserveConcurrencySystem = task({ maxAttempts: 1, }, run: async (payload: any, { ctx }) => { - await testRetryPriority.triggerAndWait({ holdDelayMs: 10_000 }).unwrap(); + await testRetryPriority.triggerAndWait({}).unwrap(); logger.info("✅ Tested retry priority, now testing resume priority"); @@ -58,9 +66,11 @@ export const testRetryPriority = task({ retry: { maxAttempts: 1, }, - run: async ({ holdDelayMs = 10_000 }: { holdDelayMs: number }, { ctx }) => { + run: async (payload: any, { ctx }) => { const startEnvStats = await getEnvironmentStats(ctx.environment.id); + const retryDelayMs = ctx.environment.type === "DEVELOPMENT" ? 10_000 : 61_000; + // We need to test the reserve concurrency system // 1. Retries are prioritized over new runs // Setup: Trigger a run that fails and will re-attempt in 5 seconds @@ -70,7 +80,7 @@ export const testRetryPriority = task({ // Once the retry completes successfully, the 3rd run should be dequeued const failureRun = await retryTask.trigger( - { delayMs: 0, throwError: true, failureCount: 1 }, + { delayMs: 0, throwError: true, failureCount: 1, retryDelayMs }, { tags: ["failure"] } ); await waitForRunStatus(failureRun.id, ["EXECUTING", "REATTEMPTING"]); @@ -78,7 +88,7 @@ export const testRetryPriority = task({ logger.info("Failure run is executing, triggering a run that will hit the concurrency limit"); const holdRun = await retryTask.trigger( - { delayMs: holdDelayMs, throwError: false, failureCount: 0 }, + { delayMs: retryDelayMs + 2_000, throwError: false, failureCount: 0 }, { tags: ["hold"] } ); await waitForRunStatus(holdRun.id, ["EXECUTING"]); @@ -94,10 +104,12 @@ export const testRetryPriority = task({ const completedFailureRun = await waitForRunStatus(failureRun.id, ["COMPLETED"]); const completedQueuedRun = await waitForRunStatus(queuedRun.id, ["COMPLETED"]); + const completedHoldRun = await waitForRunStatus(holdRun.id, ["COMPLETED"]); logger.info("Runs completed", { completedFailureRun, completedQueuedRun, + completedHoldRun, }); // Now we need to assert the completedFailureRun.completedAt is before completedQueuedRun.completedAt @@ -106,6 +118,12 @@ export const testRetryPriority = task({ "Failure run should complete before queued run" ); + // Lets also make sure the completedFailureRun.finishedAt is AFTER the completedHoldRun.finishedAt + assert( + completedFailureRun.finishedAt! > completedHoldRun.finishedAt!, + "Failure run should complete after hold run" + ); + // Now lets make sure all the runs are completed await waitForRunStatus(holdRun.id, ["COMPLETED"]); @@ -191,11 +209,16 @@ export const testResumeDurationPriority = task({ "Resume run is executing, triggering a run that will hold the concurrency until both the resume run and the queued run are in the queue" ); + let holdRunId: string | undefined; + if (ctx.environment.type !== "DEVELOPMENT") { const holdRun = await durationWaitTask.trigger( { waitDurationInSeconds: waitDurationInSeconds + 10, doWait: false }, { tags: ["hold"] } ); + + holdRunId = holdRun.id; + await waitForRunStatus(holdRun.id, ["EXECUTING"]); logger.info("Hold run is executing, triggering a run that should be queued"); @@ -221,6 +244,16 @@ export const testResumeDurationPriority = task({ "Resume run should complete before queued run" ); + if (holdRunId) { + const completedHoldRun = await waitForRunStatus(holdRunId, ["COMPLETED"]); + + // Lets also make sure the completedResumeRun.finishedAt is AFTER the completedHoldRun.finishedAt + assert( + completedResumeRun.finishedAt! > completedHoldRun.finishedAt!, + "Resume run should complete after hold run" + ); + } + const envStats = await getEnvironmentStats(ctx.environment.id); assert( @@ -275,7 +308,7 @@ export const testEnvReserveConcurrency = task({ // But because the parent task triggers a child task, the env reserve concurrency will allow the child task to execute logger.info("Parent task is executing, waiting for child task to complete"); - await waitForRunStatus(parentRun.id, ["COMPLETED"], 10); // timeout after 10 seconds, to ensure the child task finished before the delay runs + await waitForRunStatus(parentRun.id, ["COMPLETED"], 20); // timeout after 10 seconds, to ensure the child task finished before the delay runs logger.info( "Parent task completed, which means the child task completed. Now waiting for the hold tasks to complete" @@ -332,7 +365,7 @@ export const testQueueReserveConcurrency = task({ { tags: ["root"] } ); - const completedRootRun = await waitForRunStatus(rootRecursiveRun.id, ["COMPLETED"], 20); + const completedRootRun = await waitForRunStatus(rootRecursiveRun.id, ["COMPLETED"]); assert(completedRootRun.status === "COMPLETED", "Root recursive run should be completed"); @@ -341,7 +374,7 @@ export const testQueueReserveConcurrency = task({ { tags: ["failing-root"] } ); - const failedRootRun = await waitForRunStatus(failingRootRecursiveRun.id, ["COMPLETED"], 20); + const failedRootRun = await waitForRunStatus(failingRootRecursiveRun.id, ["COMPLETED"]); assert(!failedRootRun.output?.ok, "Child of failing root run should fail"); @@ -363,183 +396,3 @@ export const testQueueReserveConcurrency = task({ ); }, }); - -export const recursiveTask = task({ - id: "recursive-task", - queue: { - concurrencyLimit: 1, - }, - retry: { - maxAttempts: 1, - }, - run: async ( - { delayMs, depth, useBatch = false }: { delayMs: number; depth: number; useBatch: boolean }, - { ctx } - ) => { - if (depth === 0) { - return; - } - - await new Promise((resolve) => setTimeout(resolve, delayMs)); - - if (useBatch) { - const batchResult = await recursiveTask.batchTriggerAndWait([ - { - payload: { delayMs, depth: depth - 1, useBatch }, - options: { tags: ["recursive"] }, - }, - ]); - - const firstRun = batchResult.runs[0] as any; - - return { - ok: firstRun.ok, - }; - } else { - const result = (await recursiveTask.triggerAndWait({ - delayMs, - depth: depth - 1, - useBatch, - })) as any; - - return { - ok: result.ok, - }; - } - }, -}); - -export const singleQueue = queue({ - name: "single-queue", - concurrencyLimit: 1, -}); - -export const delayTask = task({ - id: "delay-task", - retry: { - maxAttempts: 1, - }, - run: async (payload: { delayMs: number }, { ctx }) => { - await new Promise((resolve) => setTimeout(resolve, payload.delayMs)); - }, -}); - -export const retryTask = task({ - id: "retry-task", - queue: singleQueue, - retry: { - maxAttempts: 10, - minTimeoutInMs: 5_000, // Will retry in 5 seconds - maxTimeoutInMs: 5_000, - }, - run: async (payload: { delayMs: number; throwError: boolean; failureCount: number }, { ctx }) => { - await new Promise((resolve) => setTimeout(resolve, payload.delayMs)); - - if (payload.throwError && ctx.attempt.number <= payload.failureCount) { - throw new Error("Error"); - } - }, -}); - -export const durationWaitTask = task({ - id: "duration-wait-task", - queue: { - concurrencyLimit: 1, - }, - run: async ( - { - waitDurationInSeconds = 5, - doWait = true, - }: { waitDurationInSeconds: number; doWait: boolean }, - { ctx } - ) => { - if (doWait) { - await wait.for({ seconds: waitDurationInSeconds }); - } else { - await new Promise((resolve) => setTimeout(resolve, waitDurationInSeconds * 1000)); - } - }, -}); - -export const resumeParentTask = task({ - id: "resume-parent-task", - queue: { - concurrencyLimit: 1, - }, - run: async ( - { - delayMs = 5_000, - triggerChildTask, - useBatch = false, - }: { delayMs: number; triggerChildTask: boolean; useBatch: boolean }, - { ctx } - ) => { - if (triggerChildTask) { - if (useBatch) { - const batchResult = await resumeChildTask.batchTriggerAndWait([ - { - payload: { delayMs }, - options: { tags: ["resume-child"] }, - }, - ]); - - unwrapBatchResult(batchResult); - } else { - await resumeChildTask.triggerAndWait({ delayMs }, { tags: ["resume-child"] }).unwrap(); - } - } else { - await new Promise((resolve) => setTimeout(resolve, delayMs)); - } - }, -}); - -export const resumeChildTask = task({ - id: "resume-child-task", - run: async (payload: { delayMs: number }, { ctx }) => { - await new Promise((resolve) => setTimeout(resolve, payload.delayMs)); - }, -}); - -export const genericParentTask = task({ - id: "generic-parent-task", - run: async ( - { - delayMs = 5_000, - triggerChildTask, - useBatch = false, - }: { delayMs: number; triggerChildTask: boolean; useBatch: boolean }, - { ctx } - ) => { - if (triggerChildTask) { - if (useBatch) { - const batchResult = await genericChildTask.batchTriggerAndWait([ - { - payload: { delayMs }, - options: { tags: ["resume-child"] }, - }, - ]); - - return unwrapBatchResult(batchResult); - } else { - await genericChildTask.triggerAndWait({ delayMs }, { tags: ["resume-child"] }).unwrap(); - } - } else { - await new Promise((resolve) => setTimeout(resolve, delayMs)); - } - }, -}); - -function unwrapBatchResult(batchResult: BatchResult) { - if (batchResult.runs.some((run) => !run.ok)) { - throw new Error(`Child task failed: ${batchResult.runs.find((run) => !run.ok)?.error}`); - } - - return batchResult.runs; -} - -export const genericChildTask = task({ - id: "generic-child-task", - run: async (payload: { delayMs: number }, { ctx }) => { - await new Promise((resolve) => setTimeout(resolve, payload.delayMs)); - }, -}); diff --git a/references/test-tasks/src/utils.ts b/references/test-tasks/src/utils.ts index 21a83e1037..6bbbc38ab8 100644 --- a/references/test-tasks/src/utils.ts +++ b/references/test-tasks/src/utils.ts @@ -6,7 +6,8 @@ export type RunStatus = Awaited>["status"]; export async function waitForRunStatus( id: string, statuses: RunStatus[], - timeoutInSeconds?: number + timeoutInSeconds?: number, + pollIntervalMs = 1_000 ) { const run = await runs.retrieve(id); @@ -23,7 +24,7 @@ export async function waitForRunStatus( return run; } - await new Promise((resolve) => setTimeout(resolve, 1_000)); + await new Promise((resolve) => setTimeout(resolve, pollIntervalMs)); } throw new Error(