From 6bba731d93c918b90009f7ecc772149d46f9acdc Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 23 Jun 2025 16:30:30 +0100 Subject: [PATCH 1/2] Improve schedule engine performance --- apps/webapp/app/env.server.ts | 9 +++- apps/webapp/app/v3/alertsWorker.server.ts | 2 +- apps/webapp/app/v3/commonWorker.server.ts | 2 +- .../app/v3/legacyRunEngineWorker.server.ts | 2 +- apps/webapp/app/v3/scheduleEngine.server.ts | 2 + .../run-engine/src/engine/index.ts | 2 +- .../src/engine/distributedScheduling.ts | 26 ++++++---- .../schedule-engine/src/engine/index.ts | 5 +- .../schedule-engine/src/engine/types.ts | 2 + packages/redis-worker/src/worker.ts | 50 ++++++++++++++++++- 10 files changed, 83 insertions(+), 19 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index ffdfacf164..226ec1c7e4 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -618,6 +618,9 @@ const EnvironmentSchema = z.object({ LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50), LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100), LEGACY_RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000), + LEGACY_RUN_ENGINE_WORKER_LOG_LEVEL: z + .enum(["log", "error", "warn", "info", "debug"]) + .default("info"), LEGACY_RUN_ENGINE_WORKER_REDIS_HOST: z .string() @@ -661,6 +664,7 @@ const EnvironmentSchema = z.object({ COMMON_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50), COMMON_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100), COMMON_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000), + COMMON_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"), COMMON_WORKER_REDIS_HOST: z .string() @@ -699,6 +703,7 @@ const EnvironmentSchema = z.object({ ALERTS_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(100), ALERTS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100), ALERTS_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000), + ALERTS_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"), ALERTS_WORKER_REDIS_HOST: z .string() @@ -732,8 +737,8 @@ const EnvironmentSchema = z.object({ SCHEDULE_ENGINE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"), SCHEDULE_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"), - SCHEDULE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(1), - SCHEDULE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1), + SCHEDULE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2), + SCHEDULE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10), SCHEDULE_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000), SCHEDULE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50), SCHEDULE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50), diff --git a/apps/webapp/app/v3/alertsWorker.server.ts b/apps/webapp/app/v3/alertsWorker.server.ts index 24ebc2ce65..3e1917ead1 100644 --- a/apps/webapp/app/v3/alertsWorker.server.ts +++ b/apps/webapp/app/v3/alertsWorker.server.ts @@ -61,7 +61,7 @@ function initializeWorker() { pollIntervalMs: env.ALERTS_WORKER_POLL_INTERVAL, immediatePollIntervalMs: env.ALERTS_WORKER_IMMEDIATE_POLL_INTERVAL, shutdownTimeoutMs: env.ALERTS_WORKER_SHUTDOWN_TIMEOUT_MS, - logger: new Logger("AlertsWorker", "debug"), + logger: new Logger("AlertsWorker", env.ALERTS_WORKER_LOG_LEVEL), jobs: { "v3.deliverAlert": async ({ payload }) => { const service = new DeliverAlertService(); diff --git a/apps/webapp/app/v3/commonWorker.server.ts b/apps/webapp/app/v3/commonWorker.server.ts index 2ba633e6d6..78c373a969 100644 --- a/apps/webapp/app/v3/commonWorker.server.ts +++ b/apps/webapp/app/v3/commonWorker.server.ts @@ -196,7 +196,7 @@ function initializeWorker() { pollIntervalMs: env.COMMON_WORKER_POLL_INTERVAL, immediatePollIntervalMs: env.COMMON_WORKER_IMMEDIATE_POLL_INTERVAL, shutdownTimeoutMs: env.COMMON_WORKER_SHUTDOWN_TIMEOUT_MS, - logger: new Logger("CommonWorker", "debug"), + logger: new Logger("CommonWorker", env.COMMON_WORKER_LOG_LEVEL), jobs: { scheduleEmail: async ({ payload }) => { await sendEmail(payload); diff --git a/apps/webapp/app/v3/legacyRunEngineWorker.server.ts b/apps/webapp/app/v3/legacyRunEngineWorker.server.ts index 989a206da5..09fdbf886c 100644 --- a/apps/webapp/app/v3/legacyRunEngineWorker.server.ts +++ b/apps/webapp/app/v3/legacyRunEngineWorker.server.ts @@ -68,7 +68,7 @@ function initializeWorker() { pollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL, immediatePollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL, shutdownTimeoutMs: env.LEGACY_RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS, - logger: new Logger("LegacyRunEngineWorker", "debug"), + logger: new Logger("LegacyRunEngineWorker", env.LEGACY_RUN_ENGINE_WORKER_LOG_LEVEL), jobs: { runHeartbeat: async ({ payload }) => { const service = new TaskRunHeartbeatFailedService(); diff --git a/apps/webapp/app/v3/scheduleEngine.server.ts b/apps/webapp/app/v3/scheduleEngine.server.ts index 04dbf4e8f4..cc7bb3bd0e 100644 --- a/apps/webapp/app/v3/scheduleEngine.server.ts +++ b/apps/webapp/app/v3/scheduleEngine.server.ts @@ -61,6 +61,8 @@ function createScheduleEngine() { }, worker: { concurrency: env.SCHEDULE_WORKER_CONCURRENCY_LIMIT, + workers: env.SCHEDULE_WORKER_CONCURRENCY_WORKERS, + tasksPerWorker: env.SCHEDULE_WORKER_CONCURRENCY_TASKS_PER_WORKER, pollIntervalMs: env.SCHEDULE_WORKER_POLL_INTERVAL, shutdownTimeoutMs: env.SCHEDULE_WORKER_SHUTDOWN_TIMEOUT_MS, disabled: env.SCHEDULE_WORKER_ENABLED === "0", diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 0d7a342069..aca188cd2d 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -152,7 +152,7 @@ export class RunEngine { pollIntervalMs: options.worker.pollIntervalMs, immediatePollIntervalMs: options.worker.immediatePollIntervalMs, shutdownTimeoutMs: options.worker.shutdownTimeoutMs, - logger: new Logger("RunEngineWorker", "debug"), + logger: new Logger("RunEngineWorker", options.logLevel ?? "info"), jobs: { finishWaitpoint: async ({ payload }) => { await this.waitpointSystem.completeWaitpoint({ diff --git a/internal-packages/schedule-engine/src/engine/distributedScheduling.ts b/internal-packages/schedule-engine/src/engine/distributedScheduling.ts index c73b514b35..4c9b6b440d 100644 --- a/internal-packages/schedule-engine/src/engine/distributedScheduling.ts +++ b/internal-packages/schedule-engine/src/engine/distributedScheduling.ts @@ -5,21 +5,27 @@ */ export function calculateDistributedExecutionTime( exactScheduleTime: Date, - distributionWindowSeconds: number = 30 + distributionWindowSeconds: number = 30, + instanceId?: string ): Date { - // Use the ISO string of the exact schedule time as the seed for consistency - const seed = exactScheduleTime.toISOString(); + // Create seed by combining ISO timestamp with optional instanceId + // This ensures different instances get different distributions even with same schedule time + const timeSeed = exactScheduleTime.toISOString(); + const seed = instanceId ? `${timeSeed}:${instanceId}` : timeSeed; + + // Use a better hash function (FNV-1a variant) for more uniform distribution + let hash = 2166136261; // FNV offset basis (32-bit) - // Create a simple hash from the seed string - let hash = 0; for (let i = 0; i < seed.length; i++) { - const char = seed.charCodeAt(i); - hash = (hash << 5) - hash + char; - hash = hash & hash; // Convert to 32-bit integer + hash ^= seed.charCodeAt(i); + hash *= 16777619; // FNV prime (32-bit) + // Keep it as 32-bit unsigned integer + hash = hash >>> 0; } - // Convert hash to a value between 0 and 1 - const normalized = Math.abs(hash) / Math.pow(2, 31); + // Convert hash to a value between 0 and 1 using better normalization + // Use the full 32-bit range for better distribution + const normalized = hash / 0xffffffff; // Calculate offset in milliseconds (0 to distributionWindowSeconds * 1000) const offsetMs = Math.floor(normalized * distributionWindowSeconds * 1000); diff --git a/internal-packages/schedule-engine/src/engine/index.ts b/internal-packages/schedule-engine/src/engine/index.ts index 2e4292d6e5..83f7ea93f3 100644 --- a/internal-packages/schedule-engine/src/engine/index.ts +++ b/internal-packages/schedule-engine/src/engine/index.ts @@ -92,6 +92,8 @@ export class ScheduleEngine { catalog: scheduleWorkerCatalog, concurrency: { limit: options.worker.concurrency, + workers: options.worker.workers, + tasksPerWorker: options.worker.tasksPerWorker, }, pollIntervalMs: options.worker.pollIntervalMs, shutdownTimeoutMs: options.worker.shutdownTimeoutMs, @@ -590,7 +592,8 @@ export class ScheduleEngine { const distributedExecutionTime = calculateDistributedExecutionTime( exactScheduleTime, - this.distributionWindowSeconds + this.distributionWindowSeconds, + instanceId ); const distributionOffsetMs = exactScheduleTime.getTime() - distributedExecutionTime.getTime(); diff --git a/internal-packages/schedule-engine/src/engine/types.ts b/internal-packages/schedule-engine/src/engine/types.ts index 1d6425cfbf..6b62fd4175 100644 --- a/internal-packages/schedule-engine/src/engine/types.ts +++ b/internal-packages/schedule-engine/src/engine/types.ts @@ -35,6 +35,8 @@ export interface ScheduleEngineOptions { redis: RedisOptions; worker: { concurrency: number; + workers?: number; + tasksPerWorker?: number; pollIntervalMs?: number; shutdownTimeoutMs?: number; disabled?: boolean; diff --git a/packages/redis-worker/src/worker.ts b/packages/redis-worker/src/worker.ts index 644b804162..47e107e034 100644 --- a/packages/redis-worker/src/worker.ts +++ b/packages/redis-worker/src/worker.ts @@ -206,7 +206,7 @@ class Worker { // Launch a number of "worker loops" on the main thread. for (let i = 0; i < workers; i++) { - this.workerLoops.push(this.runWorkerLoop(`worker-${nanoid(12)}`, tasksPerWorker)); + this.workerLoops.push(this.runWorkerLoop(`worker-${nanoid(12)}`, tasksPerWorker, i, workers)); } this.setupShutdownHandlers(); @@ -390,14 +390,43 @@ class Worker { * The main loop that each worker runs. It repeatedly polls for items, * processes them, and then waits before the next iteration. */ - private async runWorkerLoop(workerId: string, taskCount: number): Promise { + private async runWorkerLoop( + workerId: string, + taskCount: number, + workerIndex: number, + totalWorkers: number + ): Promise { const pollIntervalMs = this.options.pollIntervalMs ?? 1000; const immediatePollIntervalMs = this.options.immediatePollIntervalMs ?? 100; + // Calculate the delay between starting each worker loop so that they don't all start at the same time. + const delayBetweenWorkers = this.options.pollIntervalMs ?? 1000; + const delay = delayBetweenWorkers * (totalWorkers - workerIndex); + await Worker.delay(delay); + + this.logger.info("Starting worker loop", { + workerIndex, + totalWorkers, + delay, + workerId, + taskCount, + pollIntervalMs, + immediatePollIntervalMs, + concurrencyOptions: this.concurrency, + }); + while (!this.isShuttingDown) { // Check overall load. If at capacity, wait a bit before trying to dequeue more. if (this.limiter.activeCount + this.limiter.pendingCount >= this.concurrency.limit) { + this.logger.debug("Worker at capacity, waiting", { + workerId, + concurrencyOptions: this.concurrency, + activeCount: this.limiter.activeCount, + pendingCount: this.limiter.pendingCount, + }); + await Worker.delay(pollIntervalMs); + continue; } @@ -412,10 +441,25 @@ class Worker { ); if (items.length === 0) { + this.logger.debug("No items to dequeue", { + workerId, + concurrencyOptions: this.concurrency, + activeCount: this.limiter.activeCount, + pendingCount: this.limiter.pendingCount, + }); + await Worker.delay(pollIntervalMs); continue; } + this.logger.info("Dequeued items", { + workerId, + itemCount: items.length, + concurrencyOptions: this.concurrency, + activeCount: this.limiter.activeCount, + pendingCount: this.limiter.pendingCount, + }); + // Schedule each item using the limiter. for (const item of items) { this.limiter(() => this.processItem(item as AnyQueueItem, items.length, workerId)).catch( @@ -433,6 +477,8 @@ class Worker { // Wait briefly before immediately polling again since we processed items await Worker.delay(immediatePollIntervalMs); } + + this.logger.info("Worker loop finished", { workerId }); } /** From fcc78b37a8b5b4fa55c4e7f2f98b4ec55d93d355 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 23 Jun 2025 16:41:01 +0100 Subject: [PATCH 2/2] Convert an info log to a debug log --- packages/redis-worker/src/worker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/redis-worker/src/worker.ts b/packages/redis-worker/src/worker.ts index 47e107e034..69a1bbf8b8 100644 --- a/packages/redis-worker/src/worker.ts +++ b/packages/redis-worker/src/worker.ts @@ -452,7 +452,7 @@ class Worker { continue; } - this.logger.info("Dequeued items", { + this.logger.debug("Dequeued items", { workerId, itemCount: items.length, concurrencyOptions: this.concurrency,