diff --git a/__tests__/migrate.test.ts b/__tests__/migrate.test.ts index eb61cc61..03b00514 100644 --- a/__tests__/migrate.test.ts +++ b/__tests__/migrate.test.ts @@ -14,7 +14,7 @@ import { const options: WorkerSharedOptions = {}; -const MAX_MIGRATION_NUMBER = 18; +const MAX_MIGRATION_NUMBER = 19; test("migration installs schema; second migration does no harm", async () => { await withPgClient(async (pgClient) => { @@ -238,6 +238,7 @@ test("throws helpful error message in migration 11", async () => { // Manually run the first 10 migrations const event = { + ctx: compiledSharedOptions, client: pgClient, postgresVersion: 120000, // TODO: use the actual postgres version scratchpad: Object.create(null), diff --git a/__tests__/schema.sql b/__tests__/schema.sql index f8b77b65..1e545a57 100644 --- a/__tests__/schema.sql +++ b/__tests__/schema.sql @@ -374,3 +374,25 @@ ALTER TABLE graphile_worker._private_job_queues ENABLE ROW LEVEL SECURITY; ALTER TABLE graphile_worker._private_jobs ENABLE ROW LEVEL SECURITY; ALTER TABLE graphile_worker._private_known_crontabs ENABLE ROW LEVEL SECURITY; ALTER TABLE graphile_worker._private_tasks ENABLE ROW LEVEL SECURITY; +SELECT pg_catalog.set_config('search_path', '', false); +COPY graphile_worker.migrations (id, ts, breaking) FROM stdin; +1 1970-01-01 00:00:00.000000+00 t +2 1970-01-01 00:00:00.000000+00 f +3 1970-01-01 00:00:00.000000+00 t +4 1970-01-01 00:00:00.000000+00 f +5 1970-01-01 00:00:00.000000+00 f +6 1970-01-01 00:00:00.000000+00 f +7 1970-01-01 00:00:00.000000+00 f +8 1970-01-01 00:00:00.000000+00 f +9 1970-01-01 00:00:00.000000+00 f +10 1970-01-01 00:00:00.000000+00 f +11 1970-01-01 00:00:00.000000+00 t +12 1970-01-01 00:00:00.000000+00 f +13 1970-01-01 00:00:00.000000+00 t +14 1970-01-01 00:00:00.000000+00 t +15 1970-01-01 00:00:00.000000+00 f +16 1970-01-01 00:00:00.000000+00 t +17 1970-01-01 00:00:00.000000+00 f +18 1970-01-01 00:00:00.000000+00 f +19 1970-01-01 00:00:00.000000+00 t +\. diff --git a/package.json b/package.json index 126d46d3..f99ecdb1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "graphile-worker", - "version": "0.16.6", + "version": "0.17.0-canary.1fcb2a0", "type": "commonjs", "description": "Job queue for PostgreSQL", "main": "dist/index.js", @@ -18,6 +18,7 @@ "depcheck": "depcheck --ignores='graphile-worker,faktory-worker,@google-cloud/tasks,bullmq,jest-environment-node,@docusaurus/*,@fortawesome/*,@mdx-js/*,@types/jest,clsx,eslint_d,graphile,juice,postcss-nested,prism-react-renderer,react,react-dom,svgo,ts-node,@types/debug,tslib'", "db:dump": "./scripts/dump_db", "perfTest": "cd perfTest && node ./run.js", + "towerDefence": "cd towerDefence && node ./run.mjs", "preversion": "grep '^### Pending' RELEASE_NOTES.md && echo \"⚠️ Cannot publish with 'Pending' in RELEASE_NOTES ⚠️\" && exit 1 || true", "version": "node scripts/postversion.mjs && git add src/version.ts", "website": "cd website && yarn run", @@ -51,11 +52,10 @@ "homepage": "https://github.com/graphile/worker#readme", "dependencies": { "@graphile/logger": "^0.2.0", - "@tsconfig/node18": "^18.2.4", "@types/debug": "^4.1.10", "@types/pg": "^8.10.5", "cosmiconfig": "^8.3.6", - "graphile-config": "^0.0.1-beta.11", + "graphile-config": "^0.0.1-beta.14", "json5": "^2.2.3", "pg": "^8.11.3", "tslib": "^2.6.2", @@ -72,6 +72,7 @@ "@fortawesome/free-solid-svg-icons": "^6.5.1", "@fortawesome/react-fontawesome": "^0.2.0", "@mdx-js/react": "^1.6.22", + "@tsconfig/node18": "^18.2.4", "@types/jest": "^26.0.0", "@types/json5": "^2.2.0", "@types/node": "^20.8.7", diff --git a/perfTest/graphile.config.js b/perfTest/graphile.config.js index 32ef26f8..76a20ee6 100644 --- a/perfTest/graphile.config.js +++ b/perfTest/graphile.config.js @@ -5,16 +5,29 @@ // import { WorkerProPreset } from "../graphile-pro-worker/dist/index.js"; +const CONCURRENT_JOBS = 24; + /** @type {GraphileConfig.Preset} */ const preset = { // extends: [WorkerProPreset], worker: { connectionString: process.env.PERF_DATABASE_URL || "postgres:///graphile_worker_perftest", - concurrentJobs: 3, fileExtensions: [".js", ".cjs", ".mjs"], // fileExtensions: [".js", ".cjs", ".mjs", ".ts", ".cts", ".mts"], gracefulShutdownAbortTimeout: 2500, + + concurrentJobs: CONCURRENT_JOBS, + maxPoolSize: CONCURRENT_JOBS + 1, + + //localQueue: { size: -1 }, + //completeJobBatchDelay: -1, + //failJobBatchDelay: -1, + + localQueue: { size: 500, refetchDelay: { durationMs: 10 } }, + completeJobBatchDelay: 0, + failJobBatchDelay: 0, }, }; + module.exports = preset; diff --git a/perfTest/init.js b/perfTest/init.js index 1cd6f952..f9d779d9 100644 --- a/perfTest/init.js +++ b/perfTest/init.js @@ -36,21 +36,19 @@ $$ language plpgsql;`, } else { const jobs = []; for (let i = 0; i < jobCount; i++) { - jobs.push( - `("${taskIdentifier.replace( - /["\\]/g, - "\\$&", - )}","{\\"id\\":${i}}",,,,,,)`, + jobs.push({ identifier: taskIdentifier, payload: { id: i } }); + } + console.time(`Adding jobs`); + while (jobs.length > 0) { + const jobsSlice = jobs.splice(0, 1000000); + const jobsString = JSON.stringify(jobsSlice); + console.log(`Adding ${jobsSlice.length} jobs`); + await pgPool.query( + `select 1 from graphile_worker.add_jobs(array(select json_populate_recordset(null::graphile_worker.job_spec, $1::json)));`, + [jobsString], ); + console.log(`...added`); } - const jobsString = `{"${jobs - .map((j) => j.replace(/["\\]/g, "\\$&")) - .join('","')}"}`; - console.time("Adding jobs"); - await pgPool.query( - `select graphile_worker.add_jobs($1::graphile_worker.job_spec[]);`, - [jobsString], - ); console.timeEnd("Adding jobs"); } diff --git a/perfTest/latencyTest.js b/perfTest/latencyTest.js index 2dd5c685..5b1a37ef 100644 --- a/perfTest/latencyTest.js +++ b/perfTest/latencyTest.js @@ -9,10 +9,7 @@ const preset = require("./graphile.config.js"); const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); /** @type {import('../dist/index.js').WorkerPoolOptions} */ -const options = { - concurrency: 1, - preset, -}; +const options = { preset }; async function main() { const pgPool = new Pool({ connectionString: process.env.PERF_DATABASE_URL }); diff --git a/perfTest/run.js b/perfTest/run.js index 4235ba54..a94d353f 100755 --- a/perfTest/run.js +++ b/perfTest/run.js @@ -3,10 +3,9 @@ const { execSync, exec: rawExec } = require("child_process"); const { promisify } = require("util"); const exec = promisify(rawExec); -const JOB_COUNT = 20000; +const JOB_COUNT = 200000; const STUCK_JOB_COUNT = 0; const PARALLELISM = 4; -const CONCURRENCY = 10; const time = async (cb) => { const start = process.hrtime(); @@ -52,10 +51,7 @@ async function main() { console.log("Timing startup/shutdown time..."); let result; const startupTime = await time(async () => { - result = await exec( - `node ../dist/cli.js --once -j ${CONCURRENCY} -m ${CONCURRENCY + 1}`, - execOptions, - ); + result = await exec(`node ../dist/cli.js --once`, execOptions); }); logResult(result); console.log(); @@ -81,12 +77,7 @@ async function main() { const dur = await time(async () => { const promises = []; for (let i = 0; i < PARALLELISM; i++) { - promises.push( - exec( - `node ../dist/cli.js --once -j ${CONCURRENCY} -m ${CONCURRENCY + 1}`, - execOptions, - ), - ); + promises.push(exec(`node ../dist/cli.js --once`, execOptions)); } (await Promise.all(promises)).map(logResult); }); diff --git a/scripts/dump_db b/scripts/dump_db index 1ee62a79..4dc03203 100755 --- a/scripts/dump_db +++ b/scripts/dump_db @@ -6,6 +6,7 @@ dropuser graphile_worker_role || true psql template1 -c "CREATE USER graphile_worker_role WITH SUPERUSER PASSWORD 'password';" createdb graphile_worker_dump -O graphile_worker_role PGUSER=graphile_worker_role PGPASSWORD=password PGHOST=127.0.0.1 ts-node src/cli.ts -c postgres:///graphile_worker_dump --schema-only -pg_dump --schema-only --no-owner graphile_worker_dump | sed -e '/^--/d' -e '/^\s*$/d' -e '/^SET /d' -e 's/EXECUTE FUNCTION/EXECUTE PROCEDURE/g' > __tests__/schema.sql +pg_dump --schema-only --no-owner graphile_worker_dump | sed -E -e '/^--/d' -e '/^\s*$/d' -e '/^SET /d' -e 's/EXECUTE FUNCTION/EXECUTE PROCEDURE/g' -e '/^(REVOKE|GRANT) .* ON SCHEMA public (FROM|TO) PUBLIC;$/d' > __tests__/schema.sql +pg_dump --data-only --no-owner graphile_worker_dump --table=graphile_worker.migrations --table=graphile_worker._private_pro_migrations | sed -E -e '/^--/d' -e '/^\s*$/d' -e 's/\b2[0-9]{3}-[0-9]{2}-[0-9]{2}\s[0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{1,6}\+00/1970-01-01 00:00:00.000000+00/g' -e '/^SET /d' >> __tests__/schema.sql dropdb graphile_worker_dump dropuser graphile_worker_role diff --git a/sql/000019.sql b/sql/000019.sql new file mode 100644 index 00000000..56d7aa8e --- /dev/null +++ b/sql/000019.sql @@ -0,0 +1,4 @@ +--! breaking-change +-- This is just a breaking change marker for the v0.17 worker-centric to +-- pool-centric jump. The migration itself is not breaking. +select 1; diff --git a/src/cron.ts b/src/cron.ts index 4b6cfe4c..512e84eb 100644 --- a/src/cron.ts +++ b/src/cron.ts @@ -179,6 +179,7 @@ async function scheduleCronJobs( * performs backfilling on any crontab tasks that need it. */ async function registerAndBackfillItems( + ctx: CompiledSharedOptions, { pgPool, events, cron }: { pgPool: Pool; events: WorkerEvents; cron: Cron }, escapedWorkerSchema: string, parsedCronItems: ParsedCronItem[], @@ -261,6 +262,7 @@ async function registerAndBackfillItems( // At this time it's not expected that backfilling will be sufficiently // expensive to justify optimising this further. events.emit("cron:backfill", { + ctx, cron, itemsToBackfill, timestamp: ts, @@ -338,11 +340,13 @@ export const runCron = ( } const start = new Date(); - events.emit("cron:starting", { cron, start }); + const ctx = compiledSharedOptions; + events.emit("cron:starting", { ctx, cron, start }); // We must backfill BEFORE scheduling any new jobs otherwise backfill won't // work due to known_crontabs.last_execution having been updated. await registerAndBackfillItems( + ctx, { pgPool, events, cron }, escapedWorkerSchema, parsedCronItems, @@ -350,7 +354,7 @@ export const runCron = ( useNodeTime, ); - events.emit("cron:started", { cron, start }); + events.emit("cron:started", { ctx, cron, start }); if (!cron._active) { return stop(); @@ -411,6 +415,7 @@ export const runCron = ( }, ); events.emit("cron:prematureTimer", { + ctx, cron, currentTimestamp, expectedTimestamp, @@ -427,6 +432,7 @@ export const runCron = ( )}s behind)`, ); events.emit("cron:overdueTimer", { + ctx, cron, currentTimestamp, expectedTimestamp, @@ -449,6 +455,7 @@ export const runCron = ( // Finally actually run the jobs. if (jobsAndIdentifiers.length) { events.emit("cron:schedule", { + ctx, cron, timestamp: expectedTimestamp, jobsAndIdentifiers, @@ -461,6 +468,7 @@ export const runCron = ( useNodeTime, ); events.emit("cron:scheduled", { + ctx, cron, timestamp: expectedTimestamp, jobsAndIdentifiers, diff --git a/src/generated/sql.ts b/src/generated/sql.ts index 6a6fe7ec..df57995a 100644 --- a/src/generated/sql.ts +++ b/src/generated/sql.ts @@ -2359,5 +2359,10 @@ begin return v_job; end; $$; +`, + "000019.sql": String.raw`--! breaking-change +-- This is just a breaking change marker for the v0.17 worker-centric to +-- pool-centric jump. The migration itself is not breaking. +select 1; `, }; diff --git a/src/index.ts b/src/index.ts index 4c61d351..6f431c9b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,5 @@ import { Logger } from "@graphile/logger"; -import { PluginHook } from "graphile-config"; +import { MiddlewareHandlers, PluginHook } from "graphile-config"; import type { PoolClient } from "pg"; import { getCronItems } from "./getCronItems"; @@ -7,12 +7,18 @@ import { getTasks } from "./getTasks"; import { FileDetails, PromiseOrDirect, + RunOnceOptions, + SharedOptions, Task, TaskList, WithPgClient, Worker, WorkerEvents, + WorkerPluginBaseContext, WorkerPluginContext, + WorkerPool, + WorkerSharedOptions, + WorkerUtilsOptions, } from "./interfaces"; import { CompiledSharedOptions } from "./lib"; export { parseCronItem, parseCronItems, parseCrontab } from "./crontab"; @@ -37,8 +43,33 @@ declare global { interface Tasks { /* extend this through declaration merging */ } + interface InitEvent { + ctx: WorkerPluginBaseContext; + } + interface BootstrapEvent { + ctx: WorkerPluginContext; + + /** + * The client used to perform the bootstrap. Replacing this is not officially + * supported, but... + */ + client: PoolClient; + + /** + * The Postgres version number, e.g. 120000 for PostgreSQL 12.0 + */ + readonly postgresVersion: number; + + /** + * Somewhere to store temporary data from plugins, only used during + * bootstrap and migrate + */ + readonly scratchpad: Record; + } interface MigrateEvent { + ctx: WorkerPluginContext; + /** * The client used to run the migration. Replacing this is not officially * supported, but... @@ -56,6 +87,33 @@ declare global { */ readonly scratchpad: Record; } + + interface PoolGracefulShutdownEvent { + ctx: WorkerPluginContext; + workerPool: WorkerPool; + message: string; + } + + interface PoolForcefulShutdownEvent { + ctx: WorkerPluginContext; + workerPool: WorkerPool; + message: string; + } + + interface PoolWorkerPrematureExitEvent { + ctx: WorkerPluginContext; + workerPool: WorkerPool; + worker: Worker; + /** + * Use this to spin up a new Worker in place of the old one that failed. + * Generally a Worker fails due to some underlying network or database + * issue, and just spinning up a new one in its place may simply mask the + * issue, so this is not recommended. + * + * Only the first call to this method (per event) will have any effect. + */ + replaceWithNewWorker(): void; + } } namespace GraphileConfig { @@ -203,14 +261,141 @@ declare global { * promise returned by the API you have called has resolved.) */ events?: WorkerEvents; + + /** + * If you're running in high concurrency, you will likely want to reduce + * the load on the database by using a local queue to distribute jobs to + * workers rather than having each ask the database directly. + */ + localQueue?: { + /** + * To enable processing jobs in batches, set this to an integer larger + * than 1. This will result in jobs being fetched by the pool rather than + * the worker, the pool will fetch (and lock!) `localQueue.size` jobs up + * front, and each time a worker requests a job it will be served from + * this list until the list is exhausted, at which point a new set of + * jobs will be fetched (and locked). + * + * This setting can help reduce the load on your database from looking + * for jobs, but is only really effective when there are often many jobs + * queued and ready to go, and can increase the latency of job execution + * because a single worker may lock jobs into its queue leaving other + * workers idle. + * + * @default `-1` + */ + size: number; + + /** + * How long (in milliseconds) should jobs sit in the local queue before + * they are returned to the database? Defaults to 5 minutes. + * + * @default `300000` + */ + ttl?: number; + + /** + * When running at very high scale (multiple worker instances, each + * with some level of concurrency), Worker's polling can cause + * significant load on the database when there are too few jobs in the + * database to keep all worker pools busy - each time a new job comes + * in, each pool may request it, multiplying up the load. To reduce + * this impact, when a pool receives no (or few) results to its query + * for new jobs, we can instigate a "refetch delay" to cause the pool + * to wait before issuing its next poll for jobs, even when new job + * notifications come in. + */ + refetchDelay?: { + /** + * How long in milliseconds to wait, on average, before asking for + * more jobs when a previous fetch results in insufficient jobs to + * fill the local queue. (Causes the local queue to (mostly) ignore + * "new job" notifications.) + * + * When new jobs are coming in but the workers are mostly idle, you + * can expect on average `(1000/durationMs) * INSTANCE_COUNT` "get jobs" + * queries per second to be issued to your database. Increasing this + * decreases database load at the cost of increased latency when there + * are insufficient jobs in the database to keep the local queue full. + */ + durationMs: number; + + /** + * How many jobs should a fetch return to trigger the refetchDelay? + * Must be less than the local queue size + * + * @default {0} + */ + threshold?: number; + + /** + * How many new jobs can a pool that's in refetch delay be notified + * of before it must abort the refetch delay and fetch anyway. + * + * Note that because you may have many different workers in refetch + * delay we take a random number up to this threshold, this means + * that different workers will abort refetch delay at different times + * which a) helps avoid the thundering herd problem, and b) helps to + * reduce the latency of executing a new job when all workers are in + * refetch delay. + * + * We don't know the best value for this, it likely will change based + * on a large number of factors. If you're not sure what to set it + * to, we recommend you start by taking `localQueue.size` and + * multiplying it by the number of Graphile Worker instances you're + * running (ignoring their `concurrency` settings). Then iterate + * based on the behaviors you observe. And report back to us - we'd + * love to hear about what works and what doesn't! + * + * To force the full refetch delay to always apply, set this to + * `Infinity` since `Math.random() * Infinity = Infinity` (except in + * the case that Math.random() is zero, but that's only got a 1 in + * 2^53 chance of happening so you're probably fine, right? Don't + * worry, we handle this.) + * + * @default {5 * localQueue.size} + */ + maxAbortThreshold?: number; + }; + }; + + /** + * The time in milliseconds to wait after a `completeJob` call to see if + * there are any other completeJob calls that can be batched together. A + * setting of `-1` disables this. + * + * Enabling this feature increases the time for which jobs are locked + * past completion, thus increasing the risk of catastrophic failure + * resulting in the jobs being executed again once they expire. + * + * @default `-1` + */ + completeJobBatchDelay?: number; + + /** + * The time in milliseconds to wait after a `failJob` call to see if + * there are any other failJob calls that can be batched together. A + * setting of `-1` disables this. + * + * Enabling this feature increases the time for which jobs are locked + * past failure. + * + * @default `-1` + */ + failJobBatchDelay?: number; } interface Preset { + /** Options for Graphile Worker */ worker?: WorkerOptions; } interface Plugin { + /** Plugin hooks and middleware for Graphile Worker */ worker?: { + middleware?: MiddlewareHandlers; + + // TODO: deprecate this, replace with middleware hooks?: { [key in keyof WorkerHooks]?: PluginHook< WorkerHooks[key] extends (...args: infer UArgs) => infer UResult @@ -221,6 +406,58 @@ declare global { }; } + interface WorkerMiddleware { + /** + * Called when Graphile Worker starts up. + */ + init< + T extends + | SharedOptions + | WorkerSharedOptions + | WorkerOptions + | RunOnceOptions + | WorkerUtilsOptions, + >( + event: GraphileWorker.InitEvent, + ): CompiledSharedOptions; + + /** + * Called when installing the Graphile Worker DB schema (or upgrading it). + */ + bootstrap(event: GraphileWorker.BootstrapEvent): PromiseOrDirect; + + /** + * Called when migrating the Graphile Worker DB. + */ + migrate(event: GraphileWorker.MigrateEvent): PromiseOrDirect; + + /** + * Called when performing a graceful shutdown on a WorkerPool. + */ + poolGracefulShutdown( + event: GraphileWorker.PoolGracefulShutdownEvent, + ): ReturnType; + + /** + * Called when performing a forceful shutdown on a WorkerPool. + */ + poolForcefulShutdown( + event: GraphileWorker.PoolForcefulShutdownEvent, + ): ReturnType; + + /** + * Called when a Worker inside a WorkerPool exits unexpectedly; + * allows user to choose how to handle this; for example: + * + * - graceful shutdown (default behavior) + * - forceful shutdown (probably best after a delay?) + * - boot up a replacement worker via `createNewWorker` + */ + poolWorkerPrematureExit( + event: GraphileWorker.PoolWorkerPrematureExitEvent, + ): void; + } + interface WorkerHooks { /** * Called when Graphile Worker starts up. diff --git a/src/interfaces.ts b/src/interfaces.ts index 431527cf..160a969c 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -1,7 +1,7 @@ /* eslint-disable @typescript-eslint/ban-types */ import type { EventEmitter } from "events"; import type { Stats } from "fs"; -import { AsyncHooks } from "graphile-config"; +import { AsyncHooks, Middleware } from "graphile-config"; import type { Notification, Pool, @@ -15,6 +15,7 @@ import type { Release, ResolvedWorkerPreset, } from "./lib"; +import { LocalQueue } from "./localQueue"; import type { Logger } from "./logger"; import type { Signal } from "./signals"; @@ -38,9 +39,7 @@ export interface WithPgClient { export interface EnhancedWithPgClient extends WithPgClient { /** **Experimental**; see https://github.com/graphile/worker/issues/387 */ - withRetries: ( - callback: (pgClient: PoolClient) => Promise, - ) => Promise; + withRetries: WithPgClient; } /** @@ -516,10 +515,14 @@ export interface Worker { export interface WorkerPool { id: string; + /** Encourage `n` workers to look for jobs _right now_, cancelling the delay timers. */ + nudge(n: number): void; /** @deprecated Use gracefulShutdown instead */ - release: () => Promise; - gracefulShutdown: (message?: string) => Promise; - forcefulShutdown: (message: string) => Promise; + release: () => PromiseOrDirect; + gracefulShutdown: (message?: string) => PromiseOrDirect; + forcefulShutdown: (message: string) => PromiseOrDirect<{ + forceFailedJobs: readonly Job[]; + }>; promise: Promise; /** Fires 'abort' when all running jobs should stop because worker is shutting down. @experimental */ abortSignal: AbortSignal; @@ -528,6 +531,8 @@ export interface WorkerPool { /** @internal */ _shuttingDown: boolean; /** @internal */ + _forcefulShuttingDown: boolean; + /** @internal */ _active: boolean; /** @internal */ _workers: Worker[]; @@ -562,7 +567,10 @@ export interface WorkerPool { } export interface Runner { + /** Attempts to cleanly shut down the runner */ stop: () => Promise; + /** Use .stop() instead, unless you know what you're doing */ + kill: () => Promise; addJob: AddJobFunction; promise: Promise; events: WorkerEvents; @@ -895,23 +903,32 @@ export type WorkerEventMap = { /** * When a worker pool is created */ - "pool:create": { workerPool: WorkerPool }; + "pool:create": { ctx: WorkerPluginContext; workerPool: WorkerPool }; /** * When a worker pool attempts to connect to PG ready to issue a LISTEN * statement */ - "pool:listen:connecting": { workerPool: WorkerPool; attempts: number }; + "pool:listen:connecting": { + ctx: WorkerPluginContext; + workerPool: WorkerPool; + attempts: number; + }; /** * When a worker pool starts listening for jobs via PG LISTEN */ - "pool:listen:success": { workerPool: WorkerPool; client: PoolClient }; + "pool:listen:success": { + ctx: WorkerPluginContext; + workerPool: WorkerPool; + client: PoolClient; + }; /** * When a worker pool faces an error on their PG LISTEN client */ "pool:listen:error": { + ctx: WorkerPluginContext; workerPool: WorkerPool; error: unknown; }; @@ -920,6 +937,7 @@ export type WorkerEventMap = { * When a worker pool receives a notification */ "pool:listen:notification": { + ctx: WorkerPluginContext; workerPool: WorkerPool; message: Notification; client: PoolClient; @@ -929,15 +947,27 @@ export type WorkerEventMap = { * When a worker pool listening client is no longer available */ "pool:listen:release": { + ctx: WorkerPluginContext; workerPool: WorkerPool; /** If you use this client, be careful to handle errors - it may be in an invalid state (errored, disconnected, etc). */ client: PoolClient; }; + /** + * When a worker pool fails to complete/fail a job + */ + "pool:fatalError": { + ctx: WorkerPluginContext; + workerPool: WorkerPool; + error: unknown; + action: string; + }; + /** * When a worker pool is released */ "pool:release": { + ctx: WorkerPluginContext; /** @deprecated Use workerPool for consistency */ pool: WorkerPool; workerPool: WorkerPool; @@ -947,6 +977,7 @@ export type WorkerEventMap = { * When a worker pool starts a graceful shutdown */ "pool:gracefulShutdown": { + ctx: WorkerPluginContext; /** @deprecated Use workerPool for consistency */ pool: WorkerPool; workerPool: WorkerPool; @@ -957,6 +988,7 @@ export type WorkerEventMap = { * When a worker pool graceful shutdown throws an error */ "pool:gracefulShutdown:error": { + ctx: WorkerPluginContext; /** @deprecated Use workerPool for consistency */ pool: WorkerPool; workerPool: WorkerPool; @@ -968,6 +1000,7 @@ export type WorkerEventMap = { * throws an error from release() */ "pool:gracefulShutdown:workerError": { + ctx: WorkerPluginContext; /** @deprecated Use workerPool for consistency */ pool: WorkerPool; workerPool: WorkerPool; @@ -979,6 +1012,7 @@ export type WorkerEventMap = { * When a worker pool graceful shutdown throws an error */ "pool:gracefulShutdown:complete": { + ctx: WorkerPluginContext; /** @deprecated Use workerPool for consistency */ pool: WorkerPool; workerPool: WorkerPool; @@ -988,6 +1022,7 @@ export type WorkerEventMap = { * When a worker pool starts a forceful shutdown */ "pool:forcefulShutdown": { + ctx: WorkerPluginContext; /** @deprecated Use workerPool for consistency */ pool: WorkerPool; workerPool: WorkerPool; @@ -998,6 +1033,7 @@ export type WorkerEventMap = { * When a worker pool forceful shutdown throws an error */ "pool:forcefulShutdown:error": { + ctx: WorkerPluginContext; /** @deprecated Use workerPool for consistency */ pool: WorkerPool; workerPool: WorkerPool; @@ -1008,45 +1044,126 @@ export type WorkerEventMap = { * When a worker pool forceful shutdown throws an error */ "pool:forcefulShutdown:complete": { + ctx: WorkerPluginContext; /** @deprecated Use workerPool for consistency */ pool: WorkerPool; workerPool: WorkerPool; }; + /** + * When a local queue is created + */ + "localQueue:init": { ctx: WorkerPluginContext; localQueue: LocalQueue }; + + /** + * When a local queue enters 'polling' mode + */ + "localQueue:setMode": { + ctx: WorkerPluginContext; + localQueue: LocalQueue; + oldMode: LocalQueueMode; + newMode: Exclude; + }; + + /** + * Too few jobs were fetched from the DB, so the local queue is going to + * sleep. + */ + "localQueue:refetchDelay:start": { + ctx: WorkerPluginContext; + localQueue: LocalQueue; + /** The number of jobs that were fetched */ + jobCount: number; + /** We needed this number or fewer jobs to trigger */ + threshold: number; + /** How long we should delay for */ + delayMs: number; + /** If we receive this number of nudges, we will abort the delay */ + abortThreshold: number; + }; + + /** + * Too many nudges happened whilst the local queue was asleep, and it has + * been awoken early to deal with the rush! + */ + "localQueue:refetchDelay:abort": { + ctx: WorkerPluginContext; + localQueue: LocalQueue; + /** How many nudges did we receive during the delay */ + count: number; + /** How many nudges did we need to receive for the abort */ + abortThreshold: number; + }; + + /** + * The refetchDelay terminated normally. + */ + "localQueue:refetchDelay:expired": { + ctx: WorkerPluginContext; + localQueue: LocalQueue; + }; + + /** + * The refetchDelay terminated normally. + */ + "localQueue:getJobs:complete": { + ctx: WorkerPluginContext; + localQueue: LocalQueue; + jobs: Job[]; + }; + + /** + * The refetchDelay terminated normally. + */ + "localQueue:returnJobs": { + ctx: WorkerPluginContext; + localQueue: LocalQueue; + jobs: Job[]; + }; + /** * When a worker is created */ - "worker:create": { worker: Worker; tasks: TaskList }; + "worker:create": { + ctx: WorkerPluginContext; + worker: Worker; + tasks: TaskList; + }; /** * When a worker release is requested */ - "worker:release": { worker: Worker }; + "worker:release": { ctx: WorkerPluginContext; worker: Worker }; /** * When a worker stops (normally after a release) */ - "worker:stop": { worker: Worker; error?: unknown }; + "worker:stop": { ctx: WorkerPluginContext; worker: Worker; error?: unknown }; /** * When a worker is about to ask the database for a job to execute */ - "worker:getJob:start": { worker: Worker }; + "worker:getJob:start": { ctx: WorkerPluginContext; worker: Worker }; /** * When a worker calls get_job but there are no available jobs */ - "worker:getJob:error": { worker: Worker; error: unknown }; + "worker:getJob:error": { + ctx: WorkerPluginContext; + worker: Worker; + error: unknown; + }; /** * When a worker calls get_job but there are no available jobs */ - "worker:getJob:empty": { worker: Worker }; + "worker:getJob:empty": { ctx: WorkerPluginContext; worker: Worker }; /** * When a worker is created */ "worker:fatalError": { + ctx: WorkerPluginContext; worker: Worker; error: unknown; jobError: unknown | null; @@ -1055,17 +1172,18 @@ export type WorkerEventMap = { /** * When a job is retrieved by get_job */ - "job:start": { worker: Worker; job: Job }; + "job:start": { ctx: WorkerPluginContext; worker: Worker; job: Job }; /** * When a job completes successfully */ - "job:success": { worker: Worker; job: Job }; + "job:success": { ctx: WorkerPluginContext; worker: Worker; job: Job }; /** * When a job throws an error */ "job:error": { + ctx: WorkerPluginContext; worker: Worker; job: Job; error: unknown; @@ -1076,6 +1194,7 @@ export type WorkerEventMap = { * When a job fails permanently (emitted after job:error when appropriate) */ "job:failed": { + ctx: WorkerPluginContext; worker: Worker; job: Job; error: unknown; @@ -1086,16 +1205,22 @@ export type WorkerEventMap = { * When a job has finished executing and the result (success or failure) has * been written back to the database */ - "job:complete": { worker: Worker; job: Job; error: unknown }; + "job:complete": { + ctx: WorkerPluginContext; + worker: Worker; + job: Job; + error: unknown; + }; /** **Experimental** When the cron starts working (before backfilling) */ - "cron:starting": { cron: Cron; start: Date }; + "cron:starting": { ctx: WorkerPluginContext; cron: Cron; start: Date }; /** **Experimental** When the cron starts working (after backfilling completes) */ - "cron:started": { cron: Cron; start: Date }; + "cron:started": { ctx: WorkerPluginContext; cron: Cron; start: Date }; /** **Experimental** When a number of jobs need backfilling for a particular timestamp. */ "cron:backfill": { + ctx: WorkerPluginContext; cron: Cron; itemsToBackfill: JobAndCronIdentifierWithDetails[]; timestamp: string; @@ -1106,6 +1231,7 @@ export type WorkerEventMap = { * clock was adjusted) and we try again a little later. */ "cron:prematureTimer": { + ctx: WorkerPluginContext; cron: Cron; currentTimestamp: number; expectedTimestamp: number; @@ -1117,6 +1243,7 @@ export type WorkerEventMap = { * went to sleep) and we need to catch up. */ "cron:overdueTimer": { + ctx: WorkerPluginContext; cron: Cron; currentTimestamp: number; expectedTimestamp: number; @@ -1128,6 +1255,7 @@ export type WorkerEventMap = { * database write.) */ "cron:schedule": { + ctx: WorkerPluginContext; cron: Cron; timestamp: number; jobsAndIdentifiers: JobAndCronIdentifier[]; @@ -1139,6 +1267,7 @@ export type WorkerEventMap = { * database write.) */ "cron:scheduled": { + ctx: WorkerPluginContext; cron: Cron; timestamp: number; jobsAndIdentifiers: JobAndCronIdentifier[]; @@ -1149,6 +1278,7 @@ export type WorkerEventMap = { * (currently every 8-10 minutes) */ "resetLocked:started": { + ctx: WorkerPluginContext; /** @internal Not sure this'll stay on pool */ workerPool: WorkerPool; }; @@ -1158,6 +1288,7 @@ export type WorkerEventMap = { * successfully. */ "resetLocked:success": { + ctx: WorkerPluginContext; /** * The number of milliseconds until resetLocked runs again (or null if we * won't because the pool is exiting) @@ -1172,6 +1303,7 @@ export type WorkerEventMap = { * **Experimental** When the `resetLocked` process has failed. */ "resetLocked:failure": { + ctx: WorkerPluginContext; error: Error; /** @@ -1187,21 +1319,35 @@ export type WorkerEventMap = { /** * When the runner is terminated by a signal */ - gracefulShutdown: { signal: Signal }; + gracefulShutdown: { ctx: WorkerPluginContext; signal: Signal }; /** * When the runner is terminated by a signal _again_ after 5 seconds */ - forcefulShutdown: { signal: Signal }; + forcefulShutdown: { ctx: WorkerPluginContext; signal: Signal }; /** * When the runner is stopped */ - stop: Record; + stop: { ctx: WorkerPluginContext }; }; export type WorkerEvents = TypedEventEmitter; +export type GlobalEventMap = { + /** + * When the runner is terminated by a signal + */ + gracefulShutdown: { signal: Signal }; + + /** + * When the runner is terminated by a signal _again_ after 5 seconds + */ + forcefulShutdown: { signal: Signal }; +}; + +export type GlobalEvents = TypedEventEmitter; + /** * The digest of a timestamp into the component parts that a cron schedule cares about. */ @@ -1227,20 +1373,40 @@ export interface FileDetails { export type Writeable = { -readonly [P in keyof T]: T[P] }; -export interface WorkerPluginContext { +// The options available before we connect to the database +export interface WorkerPluginBaseContext { version: string; - maxMigrationNumber: number; - breakingMigrationNumbers: number[]; - events: WorkerEvents; - logger: Logger; + resolvedPreset: ResolvedWorkerPreset; workerSchema: string; escapedWorkerSchema: string; - /** @internal */ - _rawOptions: SharedOptions; + events: WorkerEvents; + logger: Logger; +} +// Once we've connected to the DB, we know more +export interface WorkerPluginContext extends WorkerPluginBaseContext { hooks: AsyncHooks; - resolvedPreset: ResolvedWorkerPreset; + middleware: Middleware; + maxMigrationNumber: number; + breakingMigrationNumbers: number[]; } export type GetJobFunction = ( workerId: string, flagsToSkip: string[] | null, -) => Promise; +) => PromiseOrDirect; + +export type CompleteJobFunction = (job: DbJob) => void; +export type FailJobFunction = (spec: { + job: DbJob; + message: string; + replacementPayload: undefined | unknown[]; +}) => void; + +export const LocalQueueModes = { + STARTING: "STARTING", + POLLING: "POLLING", + WAITING: "WAITING", + TTL_EXPIRED: "TTL_EXPIRED", + RELEASED: "RELEASED", +} as const; + +export type LocalQueueMode = keyof typeof LocalQueueModes; diff --git a/src/lib.ts b/src/lib.ts index e79bc8b1..8388455d 100644 --- a/src/lib.ts +++ b/src/lib.ts @@ -1,6 +1,12 @@ import * as assert from "assert"; -import { EventEmitter } from "events"; -import { applyHooks, AsyncHooks, resolvePresets } from "graphile-config"; +import EventEmitter from "events"; +import { + applyHooks, + AsyncHooks, + Middleware, + orderedApply, + resolvePreset, +} from "graphile-config"; import { Client, Pool, PoolClient, PoolConfig } from "pg"; import { makeWorkerPresetWorkerOptions } from "./config"; @@ -15,13 +21,13 @@ import { RunOnceOptions, SharedOptions, WithPgClient, - WorkerEvents, WorkerOptions, + WorkerPluginBaseContext, WorkerPluginContext, WorkerSharedOptions, WorkerUtilsOptions, } from "./interfaces"; -import { Logger, LogScope } from "./logger"; +import { LogScope } from "./logger"; import { migrate } from "./migrate"; import { WorkerPreset } from "./preset"; import { version } from "./version"; @@ -46,24 +52,14 @@ export type ResolvedWorkerPreset = GraphileConfig.ResolvedPreset & { }; // NOTE: when you add things here, you may also want to add them to WorkerPluginContext -export interface CompiledSharedOptions< - T extends SharedOptions = SharedOptions, -> { - version: string; - maxMigrationNumber: number; - breakingMigrationNumbers: number[]; - events: WorkerEvents; - logger: Logger; - workerSchema: string; - escapedWorkerSchema: string; +export interface CompiledSharedOptions + extends WorkerPluginContext { /** * DO NOT USE THIS! As we move over to presets this will be removed. * * @internal */ _rawOptions: T; - resolvedPreset: ResolvedWorkerPreset; - hooks: AsyncHooks; } interface ProcessSharedOptionsSettings { @@ -208,11 +204,24 @@ export function processSharedOptions< | CompiledSharedOptions | undefined; if (!compiled) { - const resolvedPreset = resolvePresets([ - WorkerPreset, - // Explicit options override the preset - legacyOptionsToPreset(options), - ]) as ResolvedWorkerPreset; + const resolvedPreset = resolvePreset({ + extends: [ + WorkerPreset, + // Explicit options override the preset + legacyOptionsToPreset(options), + ], + }) as ResolvedWorkerPreset; + + const middleware = new Middleware(); + + orderedApply( + resolvedPreset.plugins, + (plugin) => plugin.worker?.middleware, + (name, fn, _plugin) => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + middleware.register(name, fn as any); + }, + ); const { worker: { @@ -222,50 +231,59 @@ export function processSharedOptions< logger, events = new EventEmitter(), }, + plugins, } = resolvedPreset; - const escapedWorkerSchema = Client.prototype.escapeIdentifier(workerSchema); - if ( - !Number.isFinite(minResetLockedInterval) || - !Number.isFinite(maxResetLockedInterval) || - minResetLockedInterval < 1 || - maxResetLockedInterval < minResetLockedInterval - ) { - throw new Error( - `Invalid values for minResetLockedInterval (${minResetLockedInterval})/maxResetLockedInterval (${maxResetLockedInterval})`, - ); - } - const hooks = new AsyncHooks(); - compiled = { + + const ctx: WorkerPluginBaseContext = { version, - maxMigrationNumber: MAX_MIGRATION_NUMBER, - breakingMigrationNumbers: BREAKING_MIGRATIONS, - events, - logger, + resolvedPreset, workerSchema, escapedWorkerSchema, - _rawOptions: options, - hooks, - resolvedPreset, + events, + logger, }; - applyHooks( - resolvedPreset.plugins, - (p) => p.worker?.hooks, - (name, fn, plugin) => { - const context: WorkerPluginContext = compiled!; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const cb = ((...args: any[]) => fn(context, ...args)) as any; - cb.displayName = `${plugin.name}_hook_${name}`; - hooks.hook(name, cb); - }, - ); - _sharedOptionsCache.set(options, compiled); - Promise.resolve(hooks.process("init")).catch((error) => { - logger.error( - `One of the plugins you are using raised an error during 'init'; but errors during 'init' are currently ignored. Continuing. Error: ${error}`, - { error }, + + compiled = middleware.run("init", { ctx }, () => { + if ( + !Number.isFinite(minResetLockedInterval) || + !Number.isFinite(maxResetLockedInterval) || + minResetLockedInterval < 1 || + maxResetLockedInterval < minResetLockedInterval + ) { + throw new Error( + `Invalid values for minResetLockedInterval (${minResetLockedInterval})/maxResetLockedInterval (${maxResetLockedInterval})`, + ); + } + const hooks = new AsyncHooks(); + const compiled: CompiledSharedOptions = Object.assign(ctx, { + hooks, + middleware, + maxMigrationNumber: MAX_MIGRATION_NUMBER, + breakingMigrationNumbers: BREAKING_MIGRATIONS, + _rawOptions: options, + }); + applyHooks( + plugins, + (p) => p.worker?.hooks, + (name, fn, plugin) => { + const context: WorkerPluginContext = compiled!; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const cb = ((...args: any[]) => fn(context, ...args)) as any; + cb.displayName = `${plugin.name}_hook_${name}`; + hooks.hook(name, cb); + }, ); - }); + _sharedOptionsCache.set(options, compiled); + // 'init' hook is deprecated; use middleware instead. + Promise.resolve(hooks.process("init")).catch((error) => { + logger.error( + `One of the plugins you are using raised an error during 'init'; but errors during 'init' are currently ignored. Continuing. Error: ${error}`, + { error }, + ); + }); + return compiled; + }) as CompiledSharedOptions; } if (scope) { return { @@ -516,13 +534,20 @@ export function tryParseJson( } /** @see {@link https://www.postgresql.org/docs/current/mvcc-serialization-failure-handling.html} */ -const RETRYABLE_ERROR_CODES = [ - { code: "40001", backoffMS: 50 }, // serialization_failure - { code: "40P01", backoffMS: 50 }, // deadlock_detected - { code: "57P03", backoffMS: 3000 }, // cannot_connect_now - { code: "EHOSTUNREACH", backoffMS: 3000 }, // no connection to the server - { code: "ETIMEDOUT", backoffMS: 3000 }, // timeout -]; +export const RETRYABLE_ERROR_CODES: Record< + string, + Omit | undefined +> = { + // @ts-ignore + __proto__: null, + + "40001": { minDelay: 50, maxDelay: 5_000 }, // serialization_failure + "40P01": { minDelay: 50, maxDelay: 5_000 }, // deadlock_detected + "57P03": { minDelay: 3000, maxDelay: 120_000 }, // cannot_connect_now + EHOSTUNREACH: { minDelay: 3000, maxDelay: 120_000 }, // no connection to the server + ETIMEDOUT: { minDelay: 3000, maxDelay: 120_000 }, // timeout +}; + const MAX_RETRIES = 100; export function makeEnhancedWithPgClient( @@ -542,13 +567,17 @@ export function makeEnhancedWithPgClient( return await withPgClient(...args); } catch (rawE) { const e = coerceError(rawE); - const retryable = RETRYABLE_ERROR_CODES.find( - ({ code }) => code === e.code, - ); + const retryable = RETRYABLE_ERROR_CODES[e.code as string]; if (retryable) { lastError = e; + const delay = calculateDelay(attempts, { + maxAttempts: MAX_RETRIES, + minDelay: retryable.minDelay, + maxDelay: retryable.maxDelay, + multiplier: 1.5, + }); // Try again in backoffMS - await sleep(retryable.backoffMS * Math.sqrt(attempts + 1)); + await sleep(delay); } else { throw e; } @@ -579,3 +608,27 @@ export function isPromiseLike(v: PromiseLike | T): v is PromiseLike { // eslint-disable-next-line @typescript-eslint/no-explicit-any return v != null && typeof (v as any).then === "function"; } + +export interface RetryOptions { + maxAttempts: number; + /** Minimum delay between attempts (milliseconds); can actually be half this due to jitter */ + minDelay: number; + /** Maximum delay between attempts (milliseconds) - can actually be 1.5x this due to jitter */ + maxDelay: number; + /** `multiplier ^ attempts` */ + multiplier: number; +} + +export function calculateDelay( + previousAttempts: number, + retryOptions: RetryOptions, +) { + const { minDelay = 200, maxDelay = 30_000, multiplier = 1.5 } = retryOptions; + /** Prevent the thundering herd problem by offsetting randomly */ + const jitter = Math.random(); + + return ( + Math.min(minDelay * Math.pow(multiplier, previousAttempts), maxDelay) * + (0.5 + jitter) + ); +} diff --git a/src/localQueue.ts b/src/localQueue.ts new file mode 100644 index 00000000..7cc2c1f7 --- /dev/null +++ b/src/localQueue.ts @@ -0,0 +1,891 @@ +import assert from "assert"; + +import { + CompiledSharedOptions, + EnhancedWithPgClient, + LocalQueueMode, + LocalQueueModes, + WorkerPoolOptions, +} from "."; +import { MINUTE, SECOND } from "./cronConstants"; +import defer, { Deferred } from "./deferred"; +import { GetJobFunction, Job, TaskList, WorkerPool } from "./interfaces"; +import { + calculateDelay, + coerceError, + RETRYABLE_ERROR_CODES, + RetryOptions, + sleep, +} from "./lib"; +import { batchGetJobs } from "./sql/getJobs"; +import { returnJobs } from "./sql/returnJobs"; + +const RETURN_JOBS_RETRY_OPTIONS: RetryOptions = { + maxAttempts: 20, + minDelay: 200, + maxDelay: 30_000, + multiplier: 1.5, +}; + +const { STARTING, POLLING, WAITING, TTL_EXPIRED, RELEASED } = LocalQueueModes; + +/** + * The local queue exists to reduce strain on the database; it works by + * fetching a batch of jobs from the database and distributing them to workers + * as and when necessary. It is also responsible for polling when in use, + * relieving the workers of this responsibility. + * + * The local queue trades latency for throughput: jobs may sit in the local + * queue for a longer time (maximum `localQueue.size` jobs waiting maximum + * `localQueue.ttl` milliseconds), but fewer requests to the database are made + * for jobs since more jobs are fetched at once, enabling the worker to reach + * higher levels of performance (and reducing read stress on the DB). + * + * The local queue is always in one of these modes: + * + * - STARTING mode + * - POLLING mode + * - WAITING mode + * - TTL_EXPIRED mode + * - RELEASED mode + * + * ## STARTING mode + * + * STARTING mode is the initial state of the local queue. + * + * Immediately move to POLLING mode. + * + * ## POLLING mode + * + * The queue will only be in POLLING mode when it contains no cached jobs. + * + * When the queue enters POLLING mode: + * + * - if any refetch delay has expired it will trigger a fetch of jobs from the + * database, + * - otherwise it will trigger a refetch to happen once the refetch delay has + * completed. + * + * When jobs are fetched: + * + * - if fewer than `Math.ceil(Math.min(localQueueRefetchDelay.threshold, localQueueSize))` + * jobs were returned then a refetch delay will be set (if configured). + * - if jobs were returned then it will supply as many as possible to any + * waiting workers (`workerQueue`) + * - if all workers are busy and jobs still remain it will store them to + * `jobQueue` and immediately enter WAITING mode + * - otherwise (if no jobs remain: `jobQueue` is empty) we'll wait + * `pollInterval` ms and then fetch again. + * + * When a "new job" notification is received, once any required refetch delay + * has expired (or immediately if it has already expired) the timer will be + * cancelled, and a fetch will be fired immediately. + * + * ## WAITING mode + * + * The local queue can only be in WAITING mode if there are cached jobs. + * + * Any waiting clients are issued any available cached jobs. + * + * If no cached jobs remain, then the local queue enters POLLING mode, + * triggering a fetch. + * + * If cached jobs remain (even if there's just one, even if it has been 30 + * minutes since the last fetch) then the local queue continues to wait for + * a worker to claim the remaining jobs. Once no jobs remain, the local queue + * reverts to POLLING mode, triggering a fetch. + * + * In WAITING mode, all "new job" announcements are ignored. + * + * The local queue can be in WAITING mode for at most `getJobBatchTime` + * milliseconds (default: 30 minutes), after which all unclaimed jobs are + * returned to the pool and the local queue enters TTL_EXPIRED mode. + * + * ## TTL_EXPIRED mode + * + * This mode is used when jobs were queued in WAITING mode for too long. The + * local queue will sit in TTL_EXPIRED mode until a worker asks for a job, + * whereupon the local queue will enter POLLING mode (triggering a fetch). + * + * ## RELEASED mode + * + * Triggered on shutdown. + */ +export class LocalQueue { + /** + * The configured time (in milliseconds) that a job may sit unclaimed in the + * local queue before being returned to the database. + */ + readonly ttl: number; + + /** + * The time interval (in milliseconds) between fetch requests when in + * `POLLING` mode. + */ + readonly pollInterval: number; + + /** + * The jobs that have been pulled from the database that are waiting for a + * worker to claim them. Once claimed, a job will be removed from this list. + * This should be empty in POLLING and TTL_EXPIRED modes. + */ + + readonly jobQueue: Job[] = []; + /** + * Workers waiting for jobs are represented by deferred promises in this + * list. When a job becomes available, first it attempts to satisfy one of + * these from the workerQueue, and only if this is empty does it then add the + * job to the `jobQueue`. + */ + readonly workerQueue: Deferred[] = []; + + /** + * Are we currently fetching jobs from the DB? Prevents double-fetches. + */ + fetchInProgress = false; + + /** + * When we enter WAITING mode (i.e. there are jobs in `jobQueue`), we set up + * this timer. When the timer fires, we will release any remaining jobs in + * jobQueue back to the database (and enter TTL_EXPIRED mode). Note: all jobs + * are fetched at once, and no further jobs are fetched, so the TTL for all + * jobs will expire at the same time - we'll only return to POLLING mode once + * all jobs have been executed. + */ + ttlExpiredTimer: NodeJS.Timeout | null = null; + + /** + * The timer associated with the next fetch poll (see also `pollInterval`). + */ + fetchTimer: NodeJS.Timeout | null = null; + + /** + * Should we fetch again once the current fetch is complete? This is + * generally used to indicate that we received a "new job" notification (the + * queue is "pulsed") whilst we were already fetching, so our fetch may not + * have included that job. + */ + fetchAgain = false; + + /** + * The mode that the queue is in; must only be changed via `setMode`, which + * itself must only be called by the `setMode*()` methods. + */ + public readonly mode: LocalQueueMode = STARTING; + + /** + * The promise that resolves/rejects when the local queue has been released. + * Will not resolve until all locally queued jobs have been returned to the + * pool (or may reject if this process fails) and all active fetches and + * other background tasks are complete. This is important, otherwise we might + * release the pg.Pool that we're using before jobs are returned to the + * database, which would be something we couldn't recover from! + * + * If it rejects, may reject with a regular Error or an AggregateError + * representing multiple failures. + */ + private _finPromise = defer(); + + /** + * Errors that occurred causing the shutdown or during the shutdown of this + * local queue instance. + */ + private errors: Error[] = []; + + /** + * A count of the number of "background" processes such as fetching or + * returning jobs such that we can avoid exiting until all background tasks + * have completed. + */ + private backgroundCount = 0; + + /** + * If `localQueueRefetchDelay` is configured; set this true if the fetch + * resulted in a queue size lower than the threshold. + */ + private refetchDelayActive = false; + + /** + * If true, when the refetch delay expires in POLLING mode (or when we next + * enter POLLING mode after it expires), immediately trigger a fetch. If + * false, just wait for the regular POLLING timeouts. + */ + private refetchDelayFetchOnComplete = false; + + /** The timer tracking when the refetch delay has expired. */ + private refetchDelayTimer: NodeJS.Timeout | null = null; + + /** + * The number of new jobs received during the fetch or the resulting refetch + * delay; see also `refetchDelayAbortThreshold`. + */ + private refetchDelayCounter: number = 0; + + /** + * A random number between 0 and either + * `preset.worker.localQueue.refetchDelay.maxAbortThreshold` or + * `5*preset.worker.localQueue.size`; when we've been informed of this many + * jobs via pulse(), we must abort the refetch delay and trigger an immediate + * fetch. + */ + private refetchDelayAbortThreshold: number = Infinity; + + constructor( + private readonly compiledSharedOptions: CompiledSharedOptions, + private readonly tasks: TaskList, + private readonly withPgClient: EnhancedWithPgClient, + public readonly workerPool: WorkerPool, + /** How many jobs to fetch at once */ + private readonly getJobBatchSize: number, + /** + * If false, exit once the DB seems to have been exhausted of jobs, even if + * for just a moment. (I.e. `runOnce()`) + */ + private readonly continuous: boolean, + ) { + this.ttl = + compiledSharedOptions.resolvedPreset.worker.localQueue?.ttl ?? 5 * MINUTE; + this.pollInterval = + compiledSharedOptions.resolvedPreset.worker.pollInterval ?? 2 * SECOND; + const localQueueRefetchDelayDuration = + compiledSharedOptions.resolvedPreset.worker.localQueue?.refetchDelay + ?.durationMs; + if ( + localQueueRefetchDelayDuration != null && + localQueueRefetchDelayDuration > this.pollInterval + ) { + throw new Error( + `Invalid configuration; 'preset.worker.localQueue.refetchDelay.durationMs' (${localQueueRefetchDelayDuration}) must not be larger than 'preset.worker.pollInterval' (${this.pollInterval})`, + ); + } + compiledSharedOptions.events.emit("localQueue:init", { + ctx: compiledSharedOptions, + localQueue: this, + }); + // Immediately enter polling mode. + this.setModePolling(); + } + + /** + * Only call this from `setMode*()` helpers. + */ + private setMode( + newMode: Exclude, + ) { + const oldMode = this.mode; + // Override the 'readonly' + (this.mode as LocalQueueMode) = newMode; + this.compiledSharedOptions.events.emit("localQueue:setMode", { + ctx: this.compiledSharedOptions, + localQueue: this, + oldMode, + newMode, + }); + } + + /** + * Called when the LocalQueue is completely finished and released: no + * background tasks, no jobs in job queue. Resolves (or rejects) + * `_finPromise`. + */ + private fin() { + assert.equal(this.mode, "RELEASED"); + assert.equal(this.backgroundCount, 0); + assert.equal(this.jobQueue.length, 0); + if (this.errors.length === 1) { + this._finPromise.reject(this.errors[0]); + } else if (this.errors.length > 1) { + this._finPromise.reject( + new AggregateError(this.errors, "Worker did not exit cleanly"), + ); + } else { + this._finPromise.resolve(); + } + } + + private decreaseBackgroundCount = () => { + this.backgroundCount--; + if (this.mode === "RELEASED" && this.backgroundCount === 0) { + this.fin(); + } + }; + + private decreaseBackgroundCountWithError = (e: unknown) => { + this.backgroundCount--; + if (this.mode === "RELEASED") { + this.errors.push(coerceError(e)); + if (this.backgroundCount === 0) { + this.fin(); + } + } else { + // If we're not shutting down, view this as a temporary error (but give + // Benjie a wrist slap anyway). + this.compiledSharedOptions.logger.error( + `GraphileWorkerInternalError: Backgrounding should never yield errors when the queue is not RELEASED`, + { error: e }, + ); + } + }; + + /** + * Track promises that happen in the background, but that we want to ensure are + * handled before we release the queue (so that the database pool isn't + * released too early). + * + * IMPORTANT: never raise an error from background unless mode === "RELEASED" - you + * need to handle errors yourself! + */ + private background(promise: Promise) { + if (this.mode === "RELEASED" && this.backgroundCount === 0) { + throw new Error( + `Cannot background something when the queue is already released`, + ); + } + this.backgroundCount++; + promise.then( + this.decreaseBackgroundCount, + this.decreaseBackgroundCountWithError, + ); + } + + private setModePolling() { + assert.ok( + !this.fetchTimer, + "Cannot enter polling mode when a fetch is scheduled", + ); + assert.ok( + !this.fetchInProgress, + "Cannot enter polling mode when fetch is in progress", + ); + assert.equal( + this.jobQueue.length, + 0, + "Cannot enter polling mode when job queue isn't empty", + ); + + // There's no jobs, so there's no need for ttlExpired timer any more. + if (this.ttlExpiredTimer) { + clearTimeout(this.ttlExpiredTimer); + this.ttlExpiredTimer = null; + } + + this.setMode(POLLING); + + // This won't necessarily fetch, it will respect refetchDelay + this.fetch(); + } + + private setModeWaiting(causedByErrorHandling = false) { + if (!causedByErrorHandling) { + // Can only enter WAITING mode from POLLING mode. + assert.equal(this.mode, POLLING); + } + assert.ok( + !this.fetchTimer, + "Cannot enter waiting mode when a fetch is scheduled", + ); + assert.ok( + !this.fetchInProgress, + "Cannot enter waiting mode when fetch is in progress", + ); + assert.equal( + this.workerQueue.length, + 0, + "Cannot enter waiting mode when the worker queue is not empty", + ); + assert.notEqual( + this.jobQueue.length, + 0, + "Cannot enter waiting mode when job queue is empty", + ); + + if (this.ttlExpiredTimer) { + clearTimeout(this.ttlExpiredTimer); + } + + this.setMode(WAITING); + + this.ttlExpiredTimer = setTimeout(this.setModeTtlExpired, this.ttl); + } + + private setModeTtlExpired = () => { + // Can only enter TTL_EXPIRED mode from WAITING mode. + assert.equal(this.mode, WAITING); + assert.ok( + !this.fetchTimer, + "Cannot enter TTL expired mode when a fetch is scheduled", + ); + assert.ok( + !this.fetchInProgress, + "Cannot enter TTL expired mode when fetch is in progress", + ); + assert.notEqual( + this.jobQueue.length, + 0, + "Cannot enter TTL expired mode when job queue is empty", + ); + + if (this.ttlExpiredTimer) { + clearTimeout(this.ttlExpiredTimer); + this.ttlExpiredTimer = null; + } + + this.setMode(TTL_EXPIRED); + + // Return jobs to the pool + this.returnJobs(); + }; + + private returnJobs() { + const l = this.jobQueue.length; + if (l === 0) { + return; + } + const jobsToReturn = this.jobQueue.splice(0, l); + + this.compiledSharedOptions.events.emit("localQueue:returnJobs", { + ctx: this.compiledSharedOptions, + localQueue: this, + jobs: jobsToReturn, + }); + + let attempts = 1; + let initialError: Error; + const { maxAttempts } = RETURN_JOBS_RETRY_OPTIONS; + const onError = (e: unknown): void | Promise => { + const lastError = coerceError(e); + if (attempts === 1) { + initialError = lastError; + } + + this.compiledSharedOptions.logger.error( + `Failed to return jobs from local queue to database queue (attempt ${attempts}/${maxAttempts})`, + { + error: e, + attempts, + returnJobsRetryOptions: RETURN_JOBS_RETRY_OPTIONS, + }, + ); + + // NOTE: the mode now may not be the mode that we were in when + // returnJobs was called. An error happened... we need to deal with + // this error gracefully. + switch (this.mode) { + case "RELEASED": { + throw new Error( + `Error occurred whilst returning jobs from local queue to database queue: ${initialError.message}`, + ); + } + + // NOTE: considered doing `this.receivedJobs(jobsToReturn)`; but I + // simply trying to release them again seems safer and more correct. + default: { + if (attempts < maxAttempts) { + const code = lastError?.code as string; + const retryable = RETRYABLE_ERROR_CODES[code]; + const delay = calculateDelay(attempts - 1, { + ...RETURN_JOBS_RETRY_OPTIONS, + // NOTE: `retryable` might be undefined, in which case `RETURN_JOBS_RETRY_OPTIONS` wins + ...retryable, + }); + + // Be sure to increment attempts to avoid infinite loop! + ++attempts; + return sleep(delay).then(() => + returnJobs( + this.compiledSharedOptions, + this.withPgClient, // We'll handle the retries via onError + this.workerPool.id, + jobsToReturn, + ).then(noop, onError), + ); + } else { + // TODO: is this the correct way to handle this? Are we allowed to + // trigger shut down internally? + this.release(); + // Now we're in release mode, throwing the error will be tracked + // automatically by `this.background()` + throw new Error( + `Error occurred whilst returning jobs from local queue to database queue; aborting after ${attempts} attempts. Initial error: ${initialError.message}`, + ); + } + } + } + }; + + // NOTE: the `this.background` call covers all of the re-attempts via + // `onError` above, since `onError` returns the next promise each time. + this.background( + returnJobs( + this.compiledSharedOptions, + this.withPgClient, // We'll handle the retries via onError + this.workerPool.id, + jobsToReturn, + ).then( + noop, // No action necessary on success + onError, + ), + ); + } + + private receivedJobs(jobs: Job[]) { + const jobCount = jobs.length; + const workerCount = Math.min(jobCount, this.workerQueue.length); + const workers = this.workerQueue.splice(0, workerCount); + for (let i = 0; i < jobCount; i++) { + const job = jobs[i]; + if (i < workerCount) { + workers[i].resolve(job); + } else { + this.jobQueue.push(job); + } + } + } + + private fetch = (): void => { + if (this.fetchTimer) { + clearTimeout(this.fetchTimer); + this.fetchTimer = null; + } + if (this.refetchDelayActive) { + this.refetchDelayFetchOnComplete = true; + return; + } + this.background( + this._fetch().catch((e) => { + // This should not happen + this.compiledSharedOptions.logger.error(`Error occurred during fetch`, { + error: e, + }); + }), + ); + }; + + private async _fetch() { + /** + * Did we fetch the maximum number of records that we could? (If so, we + * should consider fetching again straight away so there's always jobs to + * be done.) + */ + let fetchedMax = false; + /** + * Did we fetch more jobs than the refetch delay threshold? (Greater than, + * not equal to.) If false, we should start a refetch delay. + * + * Initialized to `true` so on error we don't enable refetch delay. + */ + let refetchDelayThresholdSurpassed = true; + /** How many jobs did we fetch? (Initialize to zero in case of error.) */ + let jobCount = 0; + const refetchDelayOptions = + this.compiledSharedOptions.resolvedPreset.worker.localQueue?.refetchDelay; + try { + assert.equal(this.mode, POLLING, "Can only fetch when in polling mode"); + assert.equal( + this.fetchInProgress, + false, + "Cannot fetch when a fetch is already in progress", + ); + assert.equal( + this.refetchDelayActive, + false, + "Can not fetch when fetches are meant to be delayed", + ); + assert.equal( + this.jobQueue.length, + 0, + "Should not fetch when job queue isn't empty", + ); + this.fetchAgain = false; + this.fetchInProgress = true; + // NOTE: this.refetchDelayCounter is set here allow for pulse() during + // fetch(). If the refetch delay threshold is surpassed then this value + // is harmlessly ignored. + this.refetchDelayCounter = 0; + + // The ONLY await in this function. + const jobs = await batchGetJobs( + this.compiledSharedOptions, + this.withPgClient, + this.tasks, + this.workerPool.id, + null, // `flagsToSkip` is not set, see `LocalQueue.getJob` + this.getJobBatchSize, + ); + + this.compiledSharedOptions.events.emit("localQueue:getJobs:complete", { + ctx: this.compiledSharedOptions, + localQueue: this, + jobs, + }); + + jobCount = jobs.length; + fetchedMax = jobCount >= this.getJobBatchSize; + refetchDelayThresholdSurpassed = + // If we've fetched the maximum, we've met the requirement + fetchedMax || + // If refetch delay is disabled, we've met the requirement + !refetchDelayOptions || + // If we fetched more than (**not** equal to) `threshold` jobs, we've met the requirement + jobCount > (refetchDelayOptions.threshold ?? 0); + + // NOTE: we don't need to handle `this.mode === RELEASED` here because + // being in that mode guarantees the workerQueue is empty. + this.receivedJobs(jobs); + } catch (e) { + // Error happened; rely on poll interval. + this.compiledSharedOptions.logger.error( + `Error occurred fetching jobs; will try again on next poll interval. Error: ${e}`, + { error: e }, + ); + } finally { + this.fetchInProgress = false; + } + + // Finally, now that there is no fetch in progress, choose what to do next + if (this.mode === "RELEASED") { + this.returnJobs(); + return; + } + + /** How long to avoid any refetches for */ + const refetchDelayMs = + (0.5 + Math.random()) * (refetchDelayOptions?.durationMs ?? 100); + if (!refetchDelayThresholdSurpassed) { + /** The configured abort threshold */ + const maxAbortThreshold = + refetchDelayOptions?.maxAbortThreshold ?? 5 * this.getJobBatchSize; + /** + * How many notifications do we need to receive before we abort the "no + * refetches" behavior? Note: this is not + */ + const abortThreshold = + // `|| Infinity` because if `maxAbortThreshold = Infinity` and + // `Math.random() = 0` then we'd get `NaN` (`0 * Infinity = NaN`) + Math.random() * maxAbortThreshold || Infinity; + + this.fetchAgain = false; + this.refetchDelayActive = true; + this.refetchDelayFetchOnComplete = false; + this.refetchDelayAbortThreshold = abortThreshold; + // NOTE: this.refetchDelayCounter is set at the beginning of fetch() + // (i.e. above) to allow for pulse() during fetch() + this.refetchDelayTimer = setTimeout( + this.refetchDelayCompleteOrAbort, + refetchDelayMs, + ); + this.compiledSharedOptions.events.emit("localQueue:refetchDelay:start", { + ctx: this.compiledSharedOptions, + localQueue: this, + jobCount, + threshold: refetchDelayOptions?.threshold ?? 0, + delayMs: refetchDelayMs, + abortThreshold: this.refetchDelayAbortThreshold, + }); + } + + if (this.jobQueue.length > 0) { + this.setModeWaiting(); + } else { + if (fetchedMax || this.fetchAgain) { + // Maximal fetch and all jobs instantly consumed; trigger immediate refetch + // OR: new jobs came in during fetch(); trigger immediate refetch + assert.equal( + this.refetchDelayActive, + false, + "refetchDelayActive should imply didn't fetch max and fetchAgain is false", + ); + this.fetch(); + } else if (this.continuous) { + // Set up the timer + this.fetchTimer = setTimeout(this.fetch, this.pollInterval); + } else { + this.setModeReleased(); + return; + } + } + + // In case the counter was incremented sufficiently during fetch() + this.handleCheckRefetchDelayAbortThreshold(); + } + + private refetchDelayCompleteOrAbort = (aborted = false): void => { + if (this.refetchDelayTimer != null) { + clearTimeout(this.refetchDelayTimer); + this.refetchDelayTimer = null; + } + this.refetchDelayActive = false; + + if (aborted) { + // Force refetch because we've been notified of so many jobs! + this.refetchDelayFetchOnComplete = true; + + this.compiledSharedOptions.events.emit("localQueue:refetchDelay:abort", { + ctx: this.compiledSharedOptions, + localQueue: this, + count: this.refetchDelayCounter, + abortThreshold: this.refetchDelayAbortThreshold, + }); + } else { + this.compiledSharedOptions.events.emit( + "localQueue:refetchDelay:expired", + { + ctx: this.compiledSharedOptions, + localQueue: this, + }, + ); + } + + if (this.mode === POLLING && this.refetchDelayFetchOnComplete) { + // Cancel poll, do now + if (this.fetchTimer != null) { + clearTimeout(this.fetchTimer); + this.fetchTimer = null; + } + this.fetch(); + } + }; + + /** + * If no refetch delay is active, returns false; otherwise returns true and + * checks to see if we need to abort the delay and trigger a fetch. + */ + private handleCheckRefetchDelayAbortThreshold(): boolean { + if (!this.refetchDelayActive || this.mode === "RELEASED") { + return false; + } + if (this.refetchDelayCounter >= this.refetchDelayAbortThreshold) { + this.refetchDelayCompleteOrAbort(true); + } + return true; + } + + /** Called when a new job becomes available in the DB */ + public pulse(count: number) { + this.refetchDelayCounter += count; + + if (this.handleCheckRefetchDelayAbortThreshold()) { + // Refetch delay was enabled; we've incremented the counter and taken + // action if necessary. No further action necessary. + } else if (this.mode === POLLING) { + if (this.fetchInProgress) { + this.fetchAgain = true; + } else if (this.fetchTimer != null) { + clearTimeout(this.fetchTimer); + this.fetchTimer = null; + this.fetch(); + } + } + } + + // If you refactor this to be a method rather than a property, make sure that + // you `.bind(this)` to it. + public getJob: GetJobFunction = (workerId, flagsToSkip) => { + if (this.mode === RELEASED) { + return undefined; + } + + // Cannot batch if there's flags + if (flagsToSkip !== null) { + // PERF: we could actually batch for similar flags, I guess. + const jobsPromise = batchGetJobs( + this.compiledSharedOptions, + this.withPgClient, + this.tasks, + this.workerPool.id, + flagsToSkip, + 1, + ); + return jobsPromise.then((jobs) => jobs[0]); + } + + if (this.mode === TTL_EXPIRED) { + this.setModePolling(); + } + + const job = this.jobQueue.shift(); + if (job !== undefined) { + if (this.jobQueue.length === 0) { + assert.equal(this.mode, WAITING); + this.setModePolling(); + } + return job; + } else { + const d = defer(); + this.workerQueue.push(d); + return d; + } + }; + + public release() { + if (this.mode !== "RELEASED") { + this.setModeReleased(); + } + return this._finPromise; + } + + private setModeReleased() { + const oldMode = this.mode; + assert.notEqual(oldMode, RELEASED, "LocalQueue must only be released once"); + this.setMode(RELEASED); + + if (this.refetchDelayTimer != null) { + clearTimeout(this.refetchDelayTimer); + this.refetchDelayTimer = null; + } + this.refetchDelayActive = false; + + switch (oldMode) { + case POLLING: { + // Release pending workers + const futureJobs = this.workerQueue.splice(0, this.workerQueue.length); + futureJobs.forEach((futureJob) => futureJob.resolve(undefined)); + + // Release next fetch call + if (this.fetchTimer != null) { + // No need to return jobs in POLLING mode + clearTimeout(this.fetchTimer); + this.fetchTimer = null; + } else { + // There's a fetch in progress, so backgroundCount will not be 0, and + // fetch handles calling returnJobs if it completes when in RELEASED + // mode. + } + + break; + } + case WAITING: { + if (this.ttlExpiredTimer != null) { + clearTimeout(this.ttlExpiredTimer); + this.ttlExpiredTimer = null; + } + // Trigger the jobs to be released + // NOTE: this will add to backgroundCount + this.returnJobs(); + break; + } + case TTL_EXPIRED: { + // No action necessary, jobs are already returned, no jobs, no pending workers + break; + } + case STARTING: { + // From STARTING to RELEASED directly? This should never happen! + break; + } + case RELEASED: { + // Explicitly ruled against via assertion above. + break; + } + default: { + const never: never = oldMode; + throw new Error(`Unhandled mode: ${never}`); + } + } + + if (this.backgroundCount === 0) { + this.fin(); + } + } +} + +function noop() {} diff --git a/src/main.ts b/src/main.ts index ccc653b2..65a5292a 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,38 +1,55 @@ +import * as assert from "assert"; import { randomBytes } from "crypto"; import { EventEmitter } from "events"; import { Notification, Pool, PoolClient } from "pg"; import { inspect } from "util"; -import defer from "./deferred"; +import defer, { Deferred } from "./deferred"; import { makeWithPgClientFromClient, makeWithPgClientFromPool, } from "./helpers"; import { + CompleteJobFunction, EnhancedWithPgClient, + FailJobFunction, GetJobFunction, + GlobalEventMap, + GlobalEvents, Job, RunOnceOptions, TaskList, - WorkerEventMap, WorkerEvents, WorkerPool, WorkerPoolOptions, } from "./interfaces"; import { + calculateDelay, coerceError, CompiledSharedOptions, makeEnhancedWithPgClient, processSharedOptions, + RETRYABLE_ERROR_CODES, + RetryOptions, + sleep, tryParseJson, } from "./lib"; +import { LocalQueue } from "./localQueue"; import { Logger } from "./logger"; import SIGNALS, { Signal } from "./signals"; -import { failJobs } from "./sql/failJob"; -import { getJob as baseGetJob } from "./sql/getJob"; +import { batchCompleteJobs } from "./sql/completeJobs"; +import { batchFailJobs, failJobs } from "./sql/failJobs"; +import { batchGetJobs } from "./sql/getJobs"; import { resetLockedAt } from "./sql/resetLockedAt"; import { makeNewWorker } from "./worker"; +const BATCH_RETRY_OPTIONS: RetryOptions = { + maxAttempts: 20, + minDelay: 200, + maxDelay: 30_000, + multiplier: 1.5, +}; + const ENABLE_DANGEROUS_LOGS = process.env.GRAPHILE_ENABLE_DANGEROUS_LOGS === "1"; const NO_LOG_SUCCESS = !!process.env.NO_LOG_SUCCESS; @@ -50,7 +67,7 @@ export { allWorkerPools as _allWorkerPools }; * gracefulShutdown to all the pools' events; we use this event emitter to * aggregate these requests. */ -const _signalHandlersEventEmitter: WorkerEvents = new EventEmitter(); +const _signalHandlersEventEmitter: GlobalEvents = new EventEmitter(); /** * Only register the signal handlers once _globally_. @@ -72,7 +89,7 @@ let _registeredSignalHandlersCount = 0; * future calls will register the events but take no further actions. */ function registerSignalHandlers( - logger: Logger, + ctx: CompiledSharedOptions, events: WorkerEvents, ): () => void { if (_shuttingDownGracefully || _shuttingDownForcefully) { @@ -81,13 +98,13 @@ function registerSignalHandlers( ); } - const gscb = (o: WorkerEventMap["gracefulShutdown"]) => - events.emit("gracefulShutdown", o); - const fscb = (o: WorkerEventMap["forcefulShutdown"]) => - events.emit("forcefulShutdown", o); + const gscb = (o: GlobalEventMap["gracefulShutdown"]) => + events.emit("gracefulShutdown", { ctx, ...o }); + const fscb = (o: GlobalEventMap["forcefulShutdown"]) => + events.emit("forcefulShutdown", { ctx, ...o }); if (!_registeredSignalHandlers) { - _reallyRegisterSignalHandlers(logger); + _reallyRegisterSignalHandlers(ctx.logger); } _registeredSignalHandlersCount++; @@ -164,16 +181,18 @@ function _reallyRegisterSignalHandlers(logger: Logger) { allWorkerPools.map((pool) => pool.gracefulShutdown(`Graceful worker shutdown due to ${signal}`), ), - ).finally(() => { - clearTimeout(switchTimeout); - process.removeListener(signal, gracefulHandler); - if (!_shuttingDownForcefully) { - logger.info( - `Global graceful shutdown complete; killing self via ${signal}`, - ); - process.kill(process.pid, signal); - } - }); + ) + .finally(() => { + clearTimeout(switchTimeout); + process.removeListener(signal, gracefulHandler); + if (!_shuttingDownForcefully) { + logger.info( + `Global graceful shutdown complete; killing self via ${signal}`, + ); + process.kill(process.pid, signal); + } + }) + .catch(noop); }; const forcefulHandler = function (signal: Signal) { if (_shuttingDownForcefully) { @@ -195,14 +214,16 @@ function _reallyRegisterSignalHandlers(logger: Logger) { allWorkerPools.map((pool) => pool.forcefulShutdown(`Forced worker shutdown due to ${signal}`), ), - ).finally(() => { - removeForcefulHandler(); - clearTimeout(removeTimeout); - logger.error( - `Global forceful shutdown completed; killing self via ${signal}`, - ); - process.kill(process.pid, signal); - }); + ) + .finally(() => { + removeForcefulHandler(); + clearTimeout(removeTimeout); + logger.error( + `Global forceful shutdown completed; killing self via ${signal}`, + ); + process.kill(process.pid, signal); + }) + .catch(noop); }; logger.debug( @@ -246,6 +267,7 @@ export function runTaskListInternal( tasks: TaskList, pgPool: Pool, ): WorkerPool { + const ctx = compiledSharedOptions; const { events, logger, @@ -313,10 +335,10 @@ export function runTaskListInternal( resetLockedAtPromise = undefined; if (workerPool._active) { const delay = resetLockedDelay(); - events.emit("resetLocked:success", { workerPool, delay }); + events.emit("resetLocked:success", { ctx, workerPool, delay }); resetLockedTimeout = setTimeout(resetLocked, delay); } else { - events.emit("resetLocked:success", { workerPool, delay: null }); + events.emit("resetLocked:success", { ctx, workerPool, delay: null }); } }, (e) => { @@ -325,6 +347,7 @@ export function runTaskListInternal( if (workerPool._active) { const delay = resetLockedDelay(); events.emit("resetLocked:failure", { + ctx, workerPool, error: e, delay, @@ -338,6 +361,7 @@ export function runTaskListInternal( ); } else { events.emit("resetLocked:failure", { + ctx, workerPool, error: e, delay: null, @@ -351,7 +375,7 @@ export function runTaskListInternal( } }, ); - events.emit("resetLocked:started", { workerPool }); + events.emit("resetLocked:started", { ctx, workerPool }); }; // Reset locked in the first 60 seconds, not immediately because we don't @@ -373,7 +397,7 @@ export function runTaskListInternal( } const reconnectWithExponentialBackoff = (err: Error) => { - events.emit("pool:listen:error", { workerPool, error: err }); + events.emit("pool:listen:error", { ctx, workerPool, error: err }); attempts++; @@ -395,7 +419,7 @@ export function runTaskListInternal( reconnectTimeout = setTimeout(() => { reconnectTimeout = null; - events.emit("pool:listen:connecting", { workerPool, attempts }); + events.emit("pool:listen:connecting", { ctx, workerPool, attempts }); pgPool.connect(listenForChanges); }, delay); }; @@ -435,6 +459,7 @@ export function runTaskListInternal( function handleNotification(message: Notification) { if (changeListener?.client === client && !workerPool._shuttingDown) { events.emit("pool:listen:notification", { + ctx, workerPool, message, client, @@ -444,10 +469,9 @@ export function runTaskListInternal( const payload = tryParseJson<{ count: number; }>(message.payload); - let n = payload?.count ?? 1; + const n = payload?.count ?? 1; if (n > 0) { - // Nudge up to `n` workers - workerPool._workers.some((worker) => worker.nudge() && --n <= 0); + workerPool.nudge(n); } break; } @@ -480,7 +504,7 @@ export function runTaskListInternal( client.removeListener("notification", handleNotification); // TODO: ideally we'd only stop handling errors once all pending queries are complete; but either way we shouldn't try again! client.removeListener("error", onErrorReleaseClientAndTryAgain); - events.emit("pool:listen:release", { workerPool, client }); + events.emit("pool:listen:release", { ctx, workerPool, client }); try { await client.query( 'UNLISTEN "jobs:insert"; UNLISTEN "worker:migrate";', @@ -500,7 +524,7 @@ export function runTaskListInternal( //---------------------------------------- changeListener = { client, release }; - events.emit("pool:listen:success", { workerPool, client }); + events.emit("pool:listen:success", { ctx, workerPool, client }); client.on("notification", handleNotification); // Subscribe to jobs:insert message @@ -521,7 +545,7 @@ export function runTaskListInternal( }; // Create a client dedicated to listening for new jobs. - events.emit("pool:listen:connecting", { workerPool, attempts }); + events.emit("pool:listen:connecting", { ctx, workerPool, attempts }); pgPool.connect(listenForChanges); return workerPool; @@ -543,9 +567,16 @@ export function _runTaskList( onTerminate?: () => Promise | void; }, ): WorkerPool { + const ctx = compiledSharedOptions; const { resolvedPreset: { - worker: { concurrentJobs: baseConcurrency, gracefulShutdownAbortTimeout }, + worker: { + concurrentJobs: baseConcurrency, + gracefulShutdownAbortTimeout, + localQueue: { size: localQueueSize = -1 } = {}, + completeJobBatchDelay = -1, + failJobBatchDelay = -1, + }, }, _rawOptions: { noHandleSignals = false }, } = compiledSharedOptions; @@ -556,8 +587,9 @@ export function _runTaskList( onTerminate, onDeactivate, } = options; + let autostart = rawAutostart; - const { logger, events } = compiledSharedOptions; + const { logger, events, middleware } = compiledSharedOptions; if (ENABLE_DANGEROUS_LOGS) { logger.debug( @@ -566,35 +598,125 @@ export function _runTaskList( ); } + if (localQueueSize > 0 && localQueueSize < concurrency) { + logger.warn( + `Your job batch size (${localQueueSize}) is smaller than your concurrency setting (${concurrency}); this may result in drastically lower performance if your jobs can complete quickly. Please update to \`localQueueSize: ${concurrency}\` to improve performance, or \`localQueueSize: -1\` to disable batching.`, + ); + } + let unregisterSignalHandlers: (() => void) | undefined = undefined; if (!noHandleSignals) { // Clean up when certain signals occur - unregisterSignalHandlers = registerSignalHandlers(logger, events); + unregisterSignalHandlers = registerSignalHandlers( + compiledSharedOptions, + events, + ); } - const promise = defer(); + /* Errors that should be raised from the workerPool.promise (i.e. _finPromise) */ + const _finErrors: Error[] = []; + const _finPromise = defer(); + + let deactivatePromise: Promise | null = null; function deactivate() { - if (workerPool._active) { + if (!deactivatePromise) { + assert.equal(workerPool._active, true); workerPool._active = false; - return onDeactivate?.(); + + deactivatePromise = (async () => { + const errors: Error[] = []; + try { + await localQueue?.release(); + } catch (rawE) { + const e = coerceError(rawE); + errors.push(e); + // Log but continue regardless + logger.error(`Releasing local queue failed: ${e}`, { error: rawE }); + } + try { + // Note: this runs regardless of success of the above + await onDeactivate?.(); + } catch (rawE) { + const e = coerceError(rawE); + errors.push(e); + // Log but continue regardless + logger.error(`onDeactivate raised an error: ${e}`, { error: rawE }); + } + + if (errors.length > 0) { + throw new AggregateError( + errors, + "Errors occurred whilst deactivating queue", + ); + } + })(); } + return deactivatePromise; } let terminated = false; - function terminate() { + async function terminate() { if (!terminated) { terminated = true; + + /* Errors that should be raised from terminate() itself */ + const terminateErrors: Error[] = []; + + const releaseCompleteJobPromise = releaseCompleteJob?.(); + const releaseFailJobPromise = releaseFailJob?.(); + const [releaseCompleteJobResult, releaseFailJobResult] = + await Promise.allSettled([ + releaseCompleteJobPromise, + releaseFailJobPromise, + ]); + if (releaseCompleteJobResult.status === "rejected") { + const error = coerceError(releaseCompleteJobResult.reason); + _finErrors.push(error); + terminateErrors.push(error); + // Log but continue regardless + logger.error( + `Releasing complete job batcher failed: ${releaseCompleteJobResult.reason}`, + { error: releaseCompleteJobResult.reason }, + ); + } + if (releaseFailJobResult.status === "rejected") { + const error = coerceError(releaseFailJobResult.reason); + _finErrors.push(error); + terminateErrors.push(error); + // Log but continue regardless + logger.error( + `Releasing failed job batcher failed: ${releaseFailJobResult.reason}`, + { error: releaseFailJobResult.reason }, + ); + } + const idx = allWorkerPools.indexOf(workerPool); allWorkerPools.splice(idx, 1); - promise.resolve(onTerminate?.()); - if (unregisterSignalHandlers) { - unregisterSignalHandlers(); + + try { + await onTerminate?.(); + } catch (e) { + _finErrors.push(coerceError(e)); + terminateErrors.push(coerceError(e)); + } + + if (terminateErrors.length === 1) { + throw terminateErrors[0]; + } else if (terminateErrors.length > 1) { + throw new AggregateError( + terminateErrors, + "Errors occurred whilst terminating queue", + ); } } else { - logger.error( - `Graphile Worker internal error: terminate() was called twice for worker pool. Ignoring second call; but this indicates a bug - please file an issue.`, - ); + try { + throw new Error( + `Graphile Worker internal error: terminate() was called twice for worker pool. Ignoring second call; but this indicates a bug - please file an issue.`, + ); + } catch (e) { + logger.error(String((e as Error).stack)); + } } } @@ -602,23 +724,68 @@ export function _runTaskList( const abortSignal = abortController.signal; const abortPromise = new Promise((_resolve, reject) => { abortSignal.addEventListener("abort", () => { - reject(abortSignal.reason); + reject(coerceError(abortSignal.reason)); }); }); // Make sure Node doesn't get upset about unhandled rejection abortPromise.then(null, () => /* noop */ void 0); + let gracefulShutdownPromise: ReturnType< + WorkerPool["gracefulShutdown"] + > | null = null; + let forcefulShutdownPromise: ReturnType< + WorkerPool["forcefulShutdown"] + > | null = null; + + let finished = false; + const finWithError = (e: unknown) => { + if (finished) { + return; + } + finished = true; + if (e != null) { + _finErrors.push(coerceError(e)); + } + if (_finErrors.length === 1) { + _finPromise.reject(_finErrors[0]); + } else if (_finErrors.length > 1) { + _finPromise.reject( + new AggregateError( + _finErrors, + `Worker pool '${workerPool.id}' failed to shut down cleanly`, + ), + ); + } else { + _finPromise.resolve(); + } + + if (unregisterSignalHandlers) { + unregisterSignalHandlers(); + } + }; + const fin = () => finWithError(null); + // This is a representation of us that can be interacted with externally const workerPool: WorkerPool = { // "otpool" - "one time pool" id: `${continuous ? "pool" : "otpool"}-${randomBytes(9).toString("hex")}`, _active: true, _shuttingDown: false, + _forcefulShuttingDown: false, _workers: [], _withPgClient: withPgClient, get worker() { return concurrency === 1 ? this._workers[0] ?? null : null; }, + nudge(this: WorkerPool, count: number) { + if (localQueue) { + localQueue.pulse(count); + } else { + let n = count; + // Nudge up to `n` workers + this._workers.some((worker) => worker.nudge() && --n <= 0); + } + }, abortSignal, abortPromise, release() { @@ -632,199 +799,369 @@ export function _runTaskList( * Stop accepting jobs, and wait gracefully for the jobs that are in * progress to complete. */ - async gracefulShutdown( - message = "Worker pool is shutting down gracefully", - ) { + gracefulShutdown(message = "Worker pool is shutting down gracefully") { if (workerPool._shuttingDown) { logger.error( `gracefulShutdown called when gracefulShutdown is already in progress`, ); - return; + return gracefulShutdownPromise!; + } + if (workerPool._forcefulShuttingDown) { + logger.error( + `gracefulShutdown called when forcefulShutdown is already in progress`, + ); + return Promise.resolve(forcefulShutdownPromise).then(() => { + throw new Error("Forceful shutdown already initiated"); + }); } + workerPool._shuttingDown = true; + gracefulShutdownPromise = middleware.run( + "poolGracefulShutdown", + { ctx, workerPool, message }, + async ({ message }) => { + events.emit("pool:gracefulShutdown", { + ctx, + pool: workerPool, + workerPool, + message, + }); + try { + logger.debug(`Attempting graceful shutdown`); + // Stop new jobs being added + const deactivatePromise = deactivate(); + + const gracefulShutdownErrors: Error[] = []; + + // Remove all the workers - we're shutting them down manually + const workers = [...workerPool._workers]; + const workerPromises = workers.map((worker) => worker.release()); + const [deactivateResult, ...workerReleaseResults] = + await Promise.allSettled([deactivatePromise, ...workerPromises]); + if (deactivateResult.status === "rejected") { + const error = coerceError(deactivateResult.reason); + _finErrors.push(error); + gracefulShutdownErrors.push(error); + // Log but continue regardless + logger.error(`Deactivation failed: ${deactivateResult.reason}`, { + error: deactivateResult.reason, + }); + } + const jobsToRelease: Job[] = []; + for (let i = 0; i < workerReleaseResults.length; i++) { + const workerReleaseResult = workerReleaseResults[i]; + if (workerReleaseResult.status === "rejected") { + const worker = workers[i]; + const job = worker.getActiveJob(); + events.emit("pool:gracefulShutdown:workerError", { + ctx, + pool: workerPool, + workerPool, + error: workerReleaseResult.reason, + job, + }); + logger.debug( + `Cancelling worker ${worker.workerId} (job: ${ + job?.id ?? "none" + }) failed`, + { + worker, + job, + reason: workerReleaseResult.reason, + }, + ); + if (job) { + jobsToRelease.push(job); + } + } + } + if (!this._forcefulShuttingDown && jobsToRelease.length > 0) { + try { + const workerIds = workers.map((worker) => worker.workerId); + logger.debug( + `Releasing the jobs ${jobsToRelease + .map((j) => j.id) + .join()} (workers: ${workerIds.join(", ")})`, + { + jobs: jobsToRelease, + workerIds, + }, + ); + const cancelledJobs = await failJobs( + compiledSharedOptions, + withPgClient, + workerPool.id, + jobsToRelease, + message, + ); + logger.debug(`Cancelled ${cancelledJobs.length} jobs`, { + cancelledJobs, + }); + } catch (e) { + gracefulShutdownErrors.push(coerceError(e)); + } + } - const abortTimer = setTimeout(() => { - abortController.abort(); - }, gracefulShutdownAbortTimeout); - abortTimer.unref(); + if (this._forcefulShuttingDown) { + // Do _not_ add to _finErrors + gracefulShutdownErrors.push( + new Error( + "forcefulShutdown was initiated whilst gracefulShutdown was still executing.", + ), + ); + } - events.emit("pool:gracefulShutdown", { - pool: workerPool, - workerPool, - message, - }); - try { - logger.debug(`Attempting graceful shutdown`); - // Stop new jobs being added - const deactivatePromise = deactivate(); - - // Remove all the workers - we're shutting them down manually - const workers = [...workerPool._workers]; - const workerPromises = workers.map((worker) => worker.release()); - const [deactivateResult, ...workerReleaseResults] = - await Promise.allSettled([deactivatePromise, ...workerPromises]); - if (deactivateResult.status === "rejected") { - // Log but continue regardless - logger.error(`Deactivation failed: ${deactivateResult.reason}`, { - error: deactivateResult.reason, - }); - } - const jobsToRelease: Job[] = []; - for (let i = 0; i < workerReleaseResults.length; i++) { - const workerReleaseResult = workerReleaseResults[i]; - if (workerReleaseResult.status === "rejected") { - const worker = workers[i]; - const job = worker.getActiveJob(); - events.emit("pool:gracefulShutdown:workerError", { + if (gracefulShutdownErrors.length === 1) { + throw gracefulShutdownErrors[0]; + } else if (gracefulShutdownErrors.length > 1) { + throw new AggregateError( + gracefulShutdownErrors, + "Errors occurred whilst shutting down worker", + ); + } + + events.emit("pool:gracefulShutdown:complete", { + ctx, pool: workerPool, workerPool, - error: workerReleaseResult.reason, - job, }); - logger.debug( - `Cancelling worker ${worker.workerId} (job: ${ - job?.id ?? "none" - }) failed`, - { - worker, - job, - reason: workerReleaseResult.reason, - }, + logger.debug("Graceful shutdown complete"); + } catch (e) { + events.emit("pool:gracefulShutdown:error", { + ctx, + pool: workerPool, + workerPool, + error: e, + }); + const message = coerceError(e).message; + logger.error( + `Error occurred during graceful shutdown: ${message}`, + { error: e }, ); - if (job) { - jobsToRelease.push(job); - } + + const forcefulPromise = + // Skip the warning about double shutdown + this._forcefulShuttingDown + ? forcefulShutdownPromise! + : this.forcefulShutdown(message); + + // NOTE: we now rely on forcefulShutdown to handle terminate() + return Promise.resolve(forcefulPromise).then(() => { + throw e; + }); } - } - if (jobsToRelease.length > 0) { - const workerIds = workers.map((worker) => worker.workerId); - logger.debug( - `Releasing the jobs ${jobsToRelease - .map((j) => j.id) - .join()} (workers: ${workerIds.join(", ")})`, - { - jobs: jobsToRelease, - workerIds, - }, - ); - const cancelledJobs = await failJobs( - compiledSharedOptions, - withPgClient, - workerPool.id, - jobsToRelease, - message, - ); - logger.debug(`Cancelled ${cancelledJobs.length} jobs`, { - cancelledJobs, - }); - } - events.emit("pool:gracefulShutdown:complete", { - pool: workerPool, - workerPool, - }); - logger.debug("Graceful shutdown complete"); - } catch (e) { - events.emit("pool:gracefulShutdown:error", { - pool: workerPool, - workerPool, - error: e, - }); - const message = coerceError(e).message; - logger.error(`Error occurred during graceful shutdown: ${message}`, { - error: e, - }); - return this.forcefulShutdown(message); - } - terminate(); + if (!terminated) { + await terminate(); + } + }, + ); + + Promise.resolve(gracefulShutdownPromise).then(fin, finWithError); + + const abortTimer = setTimeout(() => { + abortController.abort(); + }, gracefulShutdownAbortTimeout); + abortTimer.unref(); + + return gracefulShutdownPromise; }, /** * Stop accepting jobs and "fail" all currently running jobs. */ - async forcefulShutdown(message: string) { - events.emit("pool:forcefulShutdown", { - pool: workerPool, - workerPool, - message, - }); - try { - logger.debug(`Attempting forceful shutdown`); - // Stop new jobs being added - const deactivatePromise = deactivate(); - - // Release all our workers' jobs - const workers = [...workerPool._workers]; - const jobsInProgress: Array = workers - .map((worker) => worker.getActiveJob()) - .filter((job): job is Job => !!job); - - // Remove all the workers - we're shutting them down manually - const workerPromises = workers.map((worker) => worker.release(true)); - // Ignore the results, we're shutting down anyway - // TODO: add a timeout - const [deactivateResult, ..._ignoreWorkerReleaseResults] = - await Promise.allSettled([deactivatePromise, ...workerPromises]); - if (deactivateResult.status === "rejected") { - // Log but continue regardless - logger.error(`Deactivation failed: ${deactivateResult.reason}`, { - error: deactivateResult.reason, - }); - } - - if (jobsInProgress.length > 0) { - const workerIds = workers.map((worker) => worker.workerId); - logger.debug( - `Releasing the jobs ${jobsInProgress - .map((j) => j.id) - .join()} (workers: ${workerIds.join(", ")})`, - { - jobs: jobsInProgress, - workerIds, - }, - ); - const cancelledJobs = await failJobs( - compiledSharedOptions, - withPgClient, - workerPool.id, - jobsInProgress, - message, - ); - logger.debug(`Cancelled ${cancelledJobs.length} jobs`, { - cancelledJobs, - }); - } else { - logger.debug("No active jobs to release"); - } - events.emit("pool:forcefulShutdown:complete", { - pool: workerPool, - workerPool, - }); - logger.debug("Forceful shutdown complete"); - } catch (e) { - events.emit("pool:forcefulShutdown:error", { - pool: workerPool, - workerPool, - error: e, - }); - const error = coerceError(e); + forcefulShutdown(message: string) { + if (workerPool._forcefulShuttingDown) { logger.error( - `Error occurred during forceful shutdown: ${error.message}`, - { error: e }, + `forcefulShutdown called when forcefulShutdown is already in progress`, ); + return forcefulShutdownPromise!; + } + if (!workerPool._shuttingDown) { + Promise.resolve(this.gracefulShutdown()).then(null, () => {}); } - terminate(); + + workerPool._forcefulShuttingDown = true; + forcefulShutdownPromise = middleware.run( + "poolForcefulShutdown", + { ctx, workerPool, message }, + async ({ message }) => { + events.emit("pool:forcefulShutdown", { + ctx, + pool: workerPool, + workerPool, + message, + }); + try { + logger.debug(`Attempting forceful shutdown`); + const timeout = new Promise((_resolve, reject) => { + const t = setTimeout( + () => reject(new Error("Timed out")), + 5000 /* TODO: make configurable */, + ); + t.unref(); + }); + + const wasAlreadyDeactivating = deactivatePromise != null; + // Stop new jobs being added + // NOTE: deactivate() immediately stops getJob working, even if the + // promise takes a while to resolve. + const deactiveateOrTimeout = Promise.race([deactivate(), timeout]); + + const forcefulShutdownErrors: Error[] = []; + + // Release all our workers' jobs + const workers = [...workerPool._workers]; + + // Remove all the workers - we're shutting them down manually + const workerReleasePromises = workers.map((worker) => { + // Note force=true means that this completes immediately _except_ + // it still calls the `stopWorker` async hook, so we must still + // handle a timeout. + return Promise.race([worker.release(true), timeout]); + }); + // Ignore the results, we're shutting down anyway + const [deactivateResult, ...workerReleaseResults] = + await Promise.allSettled([ + deactiveateOrTimeout, + ...workerReleasePromises, + ]); + if (deactivateResult.status === "rejected") { + // Log but continue regardless + logger.error(`Deactivation failed: ${deactivateResult.reason}`, { + error: deactivateResult.reason, + }); + const error = coerceError(deactivateResult.reason); + if (!wasAlreadyDeactivating) { + // Add this to _finErrors unless it's already there + _finErrors.push(error); + } + forcefulShutdownErrors.push(error); + } + + const workerProblems = workers + .map((worker, i) => { + const result = workerReleaseResults[i]; + const activeJob = worker.getActiveJob(); + if (result.status === "rejected") { + return [ + worker, + coerceError(result.reason), + activeJob, + ] as const; + } else if (activeJob) { + return [worker, null, activeJob] as const; + } else { + return null; + } + }) + .filter((t: T | null): t is T => t != null); + + const forceFailedJobs = workerProblems + .map(([, , job]) => job) + .filter((job): job is Job => !!job); + + if (forceFailedJobs.length > 0) { + const workerIds = workers.map((worker) => worker.workerId); + logger.debug( + `Releasing the jobs ${forceFailedJobs + .map((j) => j.id) + .join()} (workers: ${workerIds.join(", ")})`, + { + jobs: forceFailedJobs, + workerIds, + }, + ); + try { + const cancelledJobs = await failJobs( + compiledSharedOptions, + withPgClient, + workerPool.id, + forceFailedJobs, + message, + ); + + logger.debug(`Cancelled ${cancelledJobs.length} jobs`, { + cancelledJobs, + }); + } catch (e) { + const error = coerceError(e); + _finErrors.push(error); + forcefulShutdownErrors.push(error); + } + } else { + logger.debug("No active jobs to release"); + } + + for (const [worker, error, job] of workerProblems) { + // These are not a failure of forcefulShutdown, so do not go into + // forcefulShutdownErrors. + _finErrors.push( + new Error( + `Worker ${worker.workerId} ${ + job ? `with active job ${job.id}` : "" + } ${ + error + ? `failed to release, error: ${error})` + : `failed to stop working` + }`, + { cause: error }, + ), + ); + } + + if (forcefulShutdownErrors.length === 1) { + throw forcefulShutdownErrors[0]; + } else if (forcefulShutdownErrors.length > 1) { + throw new AggregateError( + forcefulShutdownErrors, + "Errors occurred whilst forcefully shutting down worker", + ); + } + + events.emit("pool:forcefulShutdown:complete", { + ctx, + pool: workerPool, + workerPool, + }); + logger.debug("Forceful shutdown complete"); + return { forceFailedJobs }; + } catch (e) { + events.emit("pool:forcefulShutdown:error", { + ctx, + pool: workerPool, + workerPool, + error: e, + }); + const error = coerceError(e); + _finErrors.push(error); + logger.error( + `Error occurred during forceful shutdown: ${error.message}`, + { error: e }, + ); + throw error; + } finally { + if (!terminated) { + await terminate(); + } + } + }, + ); + + Promise.resolve(forcefulShutdownPromise).then(fin, finWithError); + + return forcefulShutdownPromise; }, - promise, + promise: _finPromise, then(onfulfilled, onrejected) { - return promise.then(onfulfilled, onrejected); + return _finPromise.then(onfulfilled, onrejected); }, catch(onrejected) { - return promise.catch(onrejected); + return _finPromise.catch(onrejected); }, finally(onfinally) { - return promise.finally(onfinally); + return _finPromise.finally(onfinally); }, _start: autostart ? null @@ -835,9 +1172,11 @@ export function _runTaskList( }, }; - promise.finally(() => { - events.emit("pool:release", { pool: workerPool, workerPool }); - }); + _finPromise + .finally(() => { + events.emit("pool:release", { ctx, pool: workerPool, workerPool }); + }) + .catch(noop); abortSignal.addEventListener("abort", () => { if (!workerPool._shuttingDown) { @@ -847,7 +1186,7 @@ export function _runTaskList( // Ensure that during a forced shutdown we get cleaned up too allWorkerPools.push(workerPool); - events.emit("pool:create", { workerPool }); + events.emit("pool:create", { ctx, workerPool }); // Spawn our workers; they can share clients from the pool. const workerId = @@ -859,16 +1198,134 @@ export function _runTaskList( `You must not set workerId when concurrency > 1; each worker must have a unique identifier`, ); } - const getJob: GetJobFunction = async (workerId, flagsToSkip) => { - return baseGetJob( - compiledSharedOptions, - withPgClient, - tasks, - workerId, - flagsToSkip, - ); - }; - for (let i = 0; i < concurrency; i++) { + const localQueue = + localQueueSize >= 1 + ? new LocalQueue( + compiledSharedOptions, + tasks, + withPgClient, + workerPool, + localQueueSize, + continuous, + ) + : null; + const getJob: GetJobFunction = localQueue + ? async (workerId, flagsToSkip) => { + if (!workerPool._active) { + return undefined; + } + return localQueue.getJob(workerId, flagsToSkip); + } + : async (_workerId, flagsToSkip) => { + if (!workerPool._active) { + return undefined; + } + const jobs = await batchGetJobs( + compiledSharedOptions, + withPgClient, + tasks, + workerPool.id, + flagsToSkip, + 1, + ); + return jobs[0]; + }; + + const { release: releaseCompleteJob, fn: completeJob } = ( + completeJobBatchDelay >= 0 + ? batch( + "completeJobs", + completeJobBatchDelay, + (jobs) => + batchCompleteJobs( + compiledSharedOptions, + withPgClient, // batch handles retries and adds backpressure + workerPool.id, + jobs, + ), + (error, jobs) => { + events.emit("pool:fatalError", { + ctx, + error, + workerPool, + action: "completeJob", + }); + logger.error( + `Failed to complete jobs '${jobs + .map((j) => j.id) + .join("', '")}':\n${String(error)}`, + { fatalError: error, jobs }, + ); + if (!workerPool._shuttingDown) { + // This is the reason for shutdown + _finErrors.push(coerceError(error)); + workerPool.gracefulShutdown( + `Could not completeJobs; queue is in an inconsistent state; aborting.`, + ); + } + }, + BATCH_RETRY_OPTIONS, + ) + : { + release: null, + fn: (job) => + batchCompleteJobs( + compiledSharedOptions, + withPgClient.withRetries, + workerPool.id, + [job], + ), + } + ) as { release: (() => void) | null; fn: CompleteJobFunction }; + + const { release: releaseFailJob, fn: failJob } = ( + failJobBatchDelay >= 0 + ? batch( + "failJobs", + failJobBatchDelay, + (specs) => + batchFailJobs( + compiledSharedOptions, + withPgClient, // batch handles retries and adds backpressure + workerPool.id, + specs, + ), + (error, specs) => { + events.emit("pool:fatalError", { + ctx, + error, + workerPool, + action: "failJob", + }); + logger.error( + `Failed to fail jobs '${specs + .map((spec) => spec.job.id) + .join("', '")}':\n${String(error)}`, + { fatalError: error, specs }, + ); + if (!workerPool._shuttingDown) { + // This is the reason for shutdown + _finErrors.push(coerceError(error)); + workerPool.gracefulShutdown( + `Could not failJobs; queue is in an inconsistent state; aborting.`, + ); + } + }, + BATCH_RETRY_OPTIONS, + ) + : { + release: null, + fn: (spec) => + batchFailJobs( + compiledSharedOptions, + withPgClient.withRetries, + workerPool.id, + [spec], + ), + } + ) as { release: (() => void) | null; fn: FailJobFunction }; + + const createNewWorkerInPool = () => { const worker = makeNewWorker(compiledSharedOptions, { tasks, withPgClient, @@ -879,18 +1336,57 @@ export function _runTaskList( autostart, workerId, getJob, + completeJob, + failJob, }); workerPool._workers.push(worker); const remove = () => { + // Remove worker from the pool + workerPool._workers.splice(workerPool._workers.indexOf(worker), 1); if (continuous && workerPool._active && !workerPool._shuttingDown) { logger.error( `Worker exited, but pool is in continuous mode, is active, and is not shutting down... Did something go wrong?`, ); + + try { + let called = false; + const replaceWithNewWorker = () => { + if (called) { + // Ignore additional calls + return; + } + called = true; + createNewWorkerInPool(); + }; + + // Allows user to choose how to handle this; for example: + // - graceful shutdown (default behavior) + // - forceful shutdown (probably best after a delay?) + // - boot up a replacement worker via `createNewWorker` + middleware.runSync( + "poolWorkerPrematureExit", + { ctx, workerPool, worker, replaceWithNewWorker }, + () => { + throw new Error(`Worker ${worker.workerId} exited unexpectedly`); + }, + ); + } catch (e) { + if (!workerPool._shuttingDown) { + _finErrors.push(coerceError(e)); + workerPool.gracefulShutdown( + "Something went wrong, one of the workers exited prematurely. Shutting down.", + ); + } + } } - workerPool._workers.splice(workerPool._workers.indexOf(worker), 1); - if (!continuous && workerPool._workers.length === 0) { - deactivate(); - terminate(); + if (workerPool._workers.length === 0) { + if (!workerPool._shuttingDown) { + workerPool.gracefulShutdown( + continuous + ? "There are no remaining workers; exiting" + : "'Run once' mode processed all available jobs and is now exiting", + ); + } } }; worker.promise.then( @@ -903,9 +1399,12 @@ export function _runTaskList( logger.error(`Worker exited with error: ${error}`, { error }); }, ); - } + return worker; + }; - // TODO: handle when a worker shuts down (spawn a new one) + for (let i = 0; i < concurrency; i++) { + createNewWorkerInPool(); + } return workerPool; } @@ -944,3 +1443,141 @@ export const runTaskListOnce = ( return pool; }; + +/** + * On error we'll retry according to retryOptions. + */ +function batch( + opName: string, + delay: number, + rawCallback: (specs: ReadonlyArray) => Promise, + errorHandler: ( + error: unknown, + specs: ReadonlyArray, + ) => void | Promise, + retryOptions?: RetryOptions, +): { + release(): void | Promise; + fn: (spec: TSpec) => void | Promise; +} { + let pending = 0; + let releasing = false; + let released = false; + const incrementPending = () => { + pending++; + }; + const decrementPending = () => { + pending--; + if (releasing === true && pending === 0) { + released = true; + promise.resolve(); + } + }; + const promise = defer(); + + let backpressure: Deferred | null = null; + function holdup() { + if (!backpressure) { + incrementPending(); + backpressure = defer(); + } + } + function allgood() { + if (backpressure) { + backpressure.resolve(); + // Bump a tick to give the things held up by backpressure a chance to register. + process.nextTick(decrementPending); + } + } + + const callback = retryOptions + ? async (specs: ReadonlyArray): Promise => { + let lastError: ReturnType | undefined; + for ( + let previousAttempts = 0; + previousAttempts < retryOptions.maxAttempts; + previousAttempts++ + ) { + if (previousAttempts > 0) { + const code = lastError?.code as string; + const retryable = RETRYABLE_ERROR_CODES[code]; + const delay = calculateDelay(previousAttempts - 1, { + ...retryOptions, + // NOTE: `retryable` might be undefined, in which case `retryOptions` wins + ...retryable, + }); + console.error( + `${opName}: attempt ${previousAttempts}/${ + retryOptions.maxAttempts + } failed${ + code ? ` with code ${JSON.stringify(code)}` : `` + }; retrying after ${delay.toFixed(0)}ms. Error: ${lastError}`, + ); + await sleep(delay); + } + try { + const result = await rawCallback(specs); + // We succeeded - remove backpressure. + allgood(); + return result; + } catch (e) { + // Tell other callers to wait until we're successful again (i.e. apply backpressure) + holdup(); + lastError = coerceError(e); + throw e; + } + } + throw ( + lastError ?? + new Error(`Failed after ${retryOptions.maxAttempts} attempts`) + ); + } + : rawCallback; + + let currentBatch: TSpec[] | null = null; + function handleSpec(spec: TSpec) { + if (released) { + throw new Error( + "This batcher has been released, and so no more calls can be made.", + ); + } + if (currentBatch !== null) { + currentBatch.push(spec); + } else { + const specs = [spec]; + currentBatch = specs; + incrementPending(); + setTimeout(() => { + currentBatch = null; + callback(specs).then(decrementPending, (error) => { + decrementPending(); + errorHandler(error, specs); + allgood(); + }); + }, delay); + } + return; + } + return { + async release() { + if (releasing) { + return; + } + releasing = true; + if (pending === 0) { + released = true; + promise.resolve(); + } + await promise; + }, + fn(spec) { + if (backpressure) { + return backpressure.then(() => handleSpec(spec)); + } else { + return handleSpec(spec); + } + }, + }; +} + +function noop() {} diff --git a/src/migrate.ts b/src/migrate.ts index dfa544fc..e679c4fc 100644 --- a/src/migrate.ts +++ b/src/migrate.ts @@ -34,25 +34,27 @@ export async function installSchema( compiledSharedOptions: CompiledSharedOptions, event: GraphileWorker.MigrateEvent, ) { - const { hooks, escapedWorkerSchema } = compiledSharedOptions; + const { hooks, escapedWorkerSchema, middleware } = compiledSharedOptions; (event as Writeable).postgresVersion = await fetchAndCheckPostgresVersion(event.client); - await hooks.process("prebootstrap", event); - // Change to this query should be reflected in website/docs/schema.md - await event.client.query(` - create schema if not exists ${escapedWorkerSchema}; - create table if not exists ${escapedWorkerSchema}.migrations( - id int primary key, - ts timestamptz default now() not null + await middleware.run("bootstrap", event, async (event) => { + await hooks.process("prebootstrap", event); + // Change to this query should be reflected in website/docs/schema.md + await event.client.query(`\ +create schema if not exists ${escapedWorkerSchema}; +create table if not exists ${escapedWorkerSchema}.migrations( + id int primary key, + ts timestamptz default now() not null +); +alter table ${escapedWorkerSchema}.migrations add column if not exists breaking boolean not null default false; +`); + await event.client.query( + `update ${escapedWorkerSchema}.migrations set breaking = true where id = any($1::int[])`, + [BREAKING_MIGRATIONS], ); - alter table ${escapedWorkerSchema}.migrations add column if not exists breaking boolean not null default false; - `); - await event.client.query( - `update ${escapedWorkerSchema}.migrations set breaking = true where id = any($1::int[])`, - [BREAKING_MIGRATIONS], - ); - await hooks.process("postbootstrap", event); + await hooks.process("postbootstrap", event); + }); } /** @internal */ @@ -95,7 +97,10 @@ export async function runMigration( const error = coerceError(rawError); await event.client.query("rollback"); await hooks.process("migrationError", { ...event, error }); - if (!migrationInsertComplete && error.code === "23505") { + if ( + !migrationInsertComplete && + CLASH_CODES.includes(error.code as string) + ) { // Someone else did this migration! Success! logger.debug( `Some other worker has performed migration ${migrationFile}; continuing.`, @@ -116,10 +121,16 @@ export async function migrate( compiledSharedOptions: CompiledSharedOptions, client: PoolClient, ) { - const { escapedWorkerSchema, hooks, logger } = compiledSharedOptions; + const { escapedWorkerSchema, hooks, logger, middleware } = + compiledSharedOptions; let latestMigration: number | null = null; let latestBreakingMigration: number | null = null; - const event = { client, postgresVersion: 0, scratchpad: Object.create(null) }; + const event = { + ctx: compiledSharedOptions, + client, + postgresVersion: 0, + scratchpad: Object.create(null), + }; for (let attempts = 0; attempts < 2; attempts++) { try { const { @@ -138,13 +149,13 @@ select current_setting('server_version_num') as server_version_num, break; } catch (rawE) { const e = coerceError(rawE); - if (attempts === 0 && (e.code === "42P01" || e.code === "42703")) { + if (attempts === 0 && NX_CODES.includes(e.code as string)) { try { await installSchema(compiledSharedOptions, event); break; } catch (rawE2) { const e2 = coerceError(rawE2); - if (e2.code === "23505") { + if (CLASH_CODES.includes(e2.code as string)) { // Another instance installed this concurrently? Go around again. } else { throw e2; @@ -159,40 +170,49 @@ select current_setting('server_version_num') as server_version_num, await sleep(400 + Math.random() * 200); } - await hooks.process("premigrate", event); + await middleware.run("migrate", event, async (event) => { + await hooks.process("premigrate", event); - const migrationFiles = Object.keys(migrations) as (keyof typeof migrations)[]; - let highestMigration = 0; - let migrated = false; - for (const migrationFile of migrationFiles) { - const migrationNumber = parseInt(migrationFile.slice(0, 6), 10); - if (migrationNumber > highestMigration) { - highestMigration = migrationNumber; - } - if (latestMigration == null || migrationNumber > latestMigration) { - migrated = true; - await runMigration( - compiledSharedOptions, - event, - migrationFile, - migrationNumber, - ); + const migrationFiles = Object.keys( + migrations, + ) as (keyof typeof migrations)[]; + let highestMigration = 0; + let migrated = false; + for (const migrationFile of migrationFiles) { + const migrationNumber = parseInt(migrationFile.slice(0, 6), 10); + if (migrationNumber > highestMigration) { + highestMigration = migrationNumber; + } + if (latestMigration == null || migrationNumber > latestMigration) { + migrated = true; + await runMigration( + compiledSharedOptions, + event, + migrationFile, + migrationNumber, + ); + } } - } - if (migrated) { - logger.debug(`Migrations complete`); - } + if (migrated) { + logger.debug(`Migrations complete`); + } - if (latestBreakingMigration && highestMigration < latestBreakingMigration) { - process.exitCode = 57; - throw new Error( - `Database is using Graphile Worker schema revision ${latestMigration} which includes breaking migration ${latestBreakingMigration}, but the currently running worker only supports up to revision ${highestMigration}. It would be unsafe to continue; please ensure all versions of Graphile Worker are compatible.`, - ); - } else if (latestMigration && highestMigration < latestMigration) { - logger.warn( - `Database is using Graphile Worker schema revision ${latestMigration}, but the currently running worker only supports up to revision ${highestMigration} which may or may not be compatible. Please ensure all versions of Graphile Worker you're running are compatible, or use Worker Pro which will perform this check for you. Attempting to continue regardless.`, - ); - } - await hooks.process("postmigrate", event); + if (latestBreakingMigration && highestMigration < latestBreakingMigration) { + process.exitCode = 57; + throw new Error( + `Database is using Graphile Worker schema revision ${latestMigration} which includes breaking migration ${latestBreakingMigration}, but the currently running worker only supports up to revision ${highestMigration}. It would be unsafe to continue; please ensure all versions of Graphile Worker are compatible.`, + ); + } else if (latestMigration && highestMigration < latestMigration) { + logger.warn( + `Database is using Graphile Worker schema revision ${latestMigration}, but the currently running worker only supports up to revision ${highestMigration} which may or may not be compatible. Please ensure all versions of Graphile Worker you're running are compatible, or use Worker Pro which will perform this check for you. Attempting to continue regardless.`, + ); + } + await hooks.process("postmigrate", event); + }); } + +/** Doesn't exist */ +const NX_CODES = ["42P01", "42703"]; +/** Someone else created */ +const CLASH_CODES = ["23505", "42P06", "42P07", "42710"]; diff --git a/src/runner.ts b/src/runner.ts index 9274a878..6d7f3855 100644 --- a/src/runner.ts +++ b/src/runner.ts @@ -6,6 +6,7 @@ import { Runner, RunnerOptions, TaskList, + WorkerPluginContext, } from "./interfaces"; import { coerceError, @@ -149,6 +150,7 @@ function buildRunner(input: { release: () => PromiseOrDirect; }): Runner { const { compiledOptions, taskList, parsedCronItems, release } = input; + const ctx: WorkerPluginContext = compiledOptions; const { events, pgPool, releasers, addJob, logger } = compiledOptions; const cron = runCron(compiledOptions, parsedCronItems, { pgPool, events }); @@ -166,7 +168,7 @@ function buildRunner(input: { compiledOptions.logger.debug("Runner stopping"); if (running) { running = false; - events.emit("stop", {}); + events.emit("stop", { ctx }); try { const promises: Array> = []; if (cron._active) { @@ -188,19 +190,27 @@ function buildRunner(input: { throw new Error("Runner is already stopped"); } }; + const kill = async () => { + if (running) { + stop().catch(() => {}); + } + if (workerPool._active) { + await workerPool.forcefulShutdown(`Terminated through .kill() command`); + } + }; - workerPool.promise.finally(() => { + const wp = workerPool.promise.finally(() => { if (running) { stop(); } }); - cron.promise.finally(() => { + const cp = cron.promise.finally(() => { if (running) { stop(); } }); - const promise = Promise.all([cron.promise, workerPool.promise]).then( + const promise = Promise.all([cp, wp]).then( () => { /* noop */ }, @@ -220,6 +230,7 @@ function buildRunner(input: { return { stop, + kill, addJob, promise, events, diff --git a/src/sql/completeJob.ts b/src/sql/completeJob.ts deleted file mode 100644 index 404235b6..00000000 --- a/src/sql/completeJob.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { DbJob, EnhancedWithPgClient } from "../interfaces"; -import { CompiledSharedOptions } from "../lib"; - -export async function completeJob( - compiledSharedOptions: CompiledSharedOptions, - withPgClient: EnhancedWithPgClient, - poolId: string, - job: DbJob, -): Promise { - const { - escapedWorkerSchema, - workerSchema, - resolvedPreset: { - worker: { preparedStatements }, - }, - } = compiledSharedOptions; - - // TODO: retry logic, in case of server connection interruption - if (job.job_queue_id != null) { - await withPgClient.withRetries((client) => - client.query({ - text: `\ -with j as ( -delete from ${escapedWorkerSchema}._private_jobs as jobs -where id = $1::bigint -returning * -) -update ${escapedWorkerSchema}._private_job_queues as job_queues -set locked_by = null, locked_at = null -from j -where job_queues.id = j.job_queue_id and job_queues.locked_by = $2::text;`, - values: [job.id, poolId], - name: !preparedStatements - ? undefined - : `complete_job_q/${workerSchema}`, - }), - ); - } else { - await withPgClient.withRetries((client) => - client.query({ - text: `\ -delete from ${escapedWorkerSchema}._private_jobs as jobs -where id = $1::bigint`, - values: [job.id], - name: !preparedStatements ? undefined : `complete_job/${workerSchema}`, - }), - ); - } -} diff --git a/src/sql/completeJobs.ts b/src/sql/completeJobs.ts new file mode 100644 index 00000000..8e861aaa --- /dev/null +++ b/src/sql/completeJobs.ts @@ -0,0 +1,86 @@ +import { DbJob, WithPgClient } from "../interfaces"; +import { CompiledSharedOptions } from "../lib"; + +const manualPrepare = false; + +export async function batchCompleteJobs( + compiledSharedOptions: CompiledSharedOptions, + withPgClient: WithPgClient, + poolId: string, + jobs: ReadonlyArray, +): Promise { + const { + escapedWorkerSchema, + workerSchema, + resolvedPreset: { + worker: { preparedStatements }, + }, + } = compiledSharedOptions; + + const jobIdsWithoutQueue: string[] = []; + const jobIdsWithQueue: string[] = []; + for (const job of jobs) { + if (job.job_queue_id != null) { + jobIdsWithQueue.push(job.id); + } else { + jobIdsWithoutQueue.push(job.id); + } + } + + if (jobIdsWithQueue.length > 0) { + await withPgClient((client) => + client.query({ + text: `\ +with j as ( +delete from ${escapedWorkerSchema}._private_jobs as jobs +using unnest($1::bigint[]) n(n) +where id = n +returning * +) +update ${escapedWorkerSchema}._private_job_queues as job_queues +set locked_by = null, locked_at = null +from j +where job_queues.id = j.job_queue_id and job_queues.locked_by = $2::text;`, + values: [jobIdsWithQueue, poolId], + name: !preparedStatements + ? undefined + : `complete_job_q/${workerSchema}`, + }), + ); + } + if (jobIdsWithoutQueue.length === 1) { + await withPgClient((client) => + client.query({ + text: `\ +delete from ${escapedWorkerSchema}._private_jobs as jobs +where id = $1::bigint`, + values: [jobIdsWithoutQueue[0]], + name: !preparedStatements ? undefined : `complete_job/${workerSchema}`, + }), + ); + } else if (jobIdsWithoutQueue.length > 1) { + if (manualPrepare) { + await withPgClient((client) => + client.query({ + text: `\ +prepare gwcj (bigint) as delete from ${escapedWorkerSchema}._private_jobs where id = $1; +${jobIdsWithoutQueue.map((id) => `execute gwcj(${id});`).join("\n")} +deallocate gwcj;`, + }), + ); + } else { + await withPgClient((client) => + client.query({ + text: `\ +delete from ${escapedWorkerSchema}._private_jobs as jobs +using unnest($1::bigint[]) n(n) +where id = n`, + values: [jobIdsWithoutQueue], + name: !preparedStatements + ? undefined + : `complete_jobs/${workerSchema}`, + }), + ); + } + } +} diff --git a/src/sql/failJob.ts b/src/sql/failJobs.ts similarity index 60% rename from src/sql/failJob.ts rename to src/sql/failJobs.ts index 63a08884..96bc1d00 100644 --- a/src/sql/failJob.ts +++ b/src/sql/failJobs.ts @@ -1,13 +1,16 @@ -import { DbJob, EnhancedWithPgClient } from "../interfaces"; +import { DbJob, EnhancedWithPgClient, WithPgClient } from "../interfaces"; import { CompiledSharedOptions } from "../lib"; +interface Spec { + job: DbJob; + message: string; + replacementPayload: undefined | unknown[]; +} -export async function failJob( +export async function batchFailJobs( compiledSharedOptions: CompiledSharedOptions, - withPgClient: EnhancedWithPgClient, + withPgClient: WithPgClient, poolId: string, - job: DbJob, - message: string, - replacementPayload: undefined | unknown[], + specs: ReadonlyArray, ): Promise { const { escapedWorkerSchema, @@ -17,56 +20,73 @@ export async function failJob( }, } = compiledSharedOptions; - // TODO: retry logic, in case of server connection interruption - if (job.job_queue_id != null) { - await withPgClient.withRetries((client) => + const specsWithQueues: Spec[] = []; + const specsWithoutQueues: Spec[] = []; + + for (const spec of specs) { + if (spec.job.job_queue_id != null) { + specsWithQueues.push(spec); + } else { + specsWithoutQueues.push(spec); + } + } + + if (specsWithQueues.length > 0) { + await withPgClient((client) => client.query({ text: `\ with j as ( update ${escapedWorkerSchema}._private_jobs as jobs set -last_error = $2::text, +last_error = (el->>'message'), run_at = greatest(now(), run_at) + (exp(least(attempts, 10)) * interval '1 second'), locked_by = null, locked_at = null, -payload = coalesce($4::json, jobs.payload) -where id = $1::bigint and locked_by = $3::text +payload = coalesce(el->'payload', jobs.payload) +from json_array_elements($2::json) as els(el) +where id = (el->>'jobId')::bigint and locked_by = $1::text returning * ) update ${escapedWorkerSchema}._private_job_queues as job_queues set locked_by = null, locked_at = null from j -where job_queues.id = j.job_queue_id and job_queues.locked_by = $3::text;`, +where job_queues.id = j.job_queue_id and job_queues.locked_by = $1::text;`, values: [ - job.id, - message, poolId, - replacementPayload != null - ? JSON.stringify(replacementPayload) - : null, + JSON.stringify( + specsWithQueues.map(({ job, message, replacementPayload }) => ({ + jobId: job.id, + message, + payload: replacementPayload, + })), + ), ], name: !preparedStatements ? undefined : `fail_job_q/${workerSchema}`, }), ); - } else { - await withPgClient.withRetries((client) => + } + if (specsWithoutQueues.length > 0) { + await withPgClient((client) => client.query({ text: `\ update ${escapedWorkerSchema}._private_jobs as jobs set -last_error = $2::text, +last_error = (el->>'message'), run_at = greatest(now(), run_at) + (exp(least(attempts, 10)) * interval '1 second'), locked_by = null, locked_at = null, -payload = coalesce($4::json, jobs.payload) -where id = $1::bigint and locked_by = $3::text;`, +payload = coalesce(el->'payload', jobs.payload) +from json_array_elements($2::json) as els(el) +where id = (el->>'jobId')::bigint and locked_by = $1::text;`, values: [ - job.id, - message, poolId, - replacementPayload != null - ? JSON.stringify(replacementPayload) - : null, + JSON.stringify( + specsWithoutQueues.map(({ job, message, replacementPayload }) => ({ + jobId: job.id, + message, + payload: replacementPayload, + })), + ), ], name: !preparedStatements ? undefined : `fail_job/${workerSchema}`, }), @@ -89,7 +109,6 @@ export async function failJobs( }, } = compiledSharedOptions; - // TODO: retry logic, in case of server connection interruption const { rows: failedJobs } = await withPgClient.withRetries((client) => client.query({ text: `\ diff --git a/src/sql/getJob.ts b/src/sql/getJobs.ts similarity index 92% rename from src/sql/getJob.ts rename to src/sql/getJobs.ts index 7f1f2247..0ceac97a 100644 --- a/src/sql/getJob.ts +++ b/src/sql/getJobs.ts @@ -11,13 +11,15 @@ export function isPromise(t: T | Promise): t is Promise { ); } -export async function getJob( +export async function batchGetJobs( compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, tasks: TaskList, poolId: string, flagsToSkip: string[] | null, -): Promise { + rawBatchSize: number, +): Promise { + const batchSize = parseInt(String(rawBatchSize), 10) || 1; const { escapedWorkerSchema, workerSchema, @@ -38,7 +40,7 @@ export async function getJob( if (taskDetails.taskIds.length === 0) { logger.error("No tasks found; nothing to do!"); - return undefined; + return []; } let i = 2; @@ -157,7 +159,7 @@ with j as ( ${queueClause} ${flagsClause} order by priority asc, run_at asc - limit 1 + limit ${batchSize} for update skip locked )${updateQueue} @@ -179,23 +181,21 @@ with j as ( ]; const name = !preparedStatements ? undefined - : `get_job${hasFlags ? "F" : ""}${useNodeTime ? "N" : ""}/${workerSchema}`; + : `get_job${batchSize === 1 ? "" : batchSize}${hasFlags ? "F" : ""}${ + useNodeTime ? "N" : "" + }/${workerSchema}`; - const { - rows: [jobRow], - } = await withPgClient.withRetries((client) => + const { rows } = await withPgClient.withRetries((client) => client.query({ text, values, name, }), ); - if (jobRow) { - return Object.assign(jobRow, { + return rows.map((jobRow) => + Object.assign(jobRow, { task_identifier: taskDetails.supportedTaskIdentifierByTaskId[jobRow.task_id], - }); - } else { - return undefined; - } + }), + ); } diff --git a/src/sql/returnJobs.ts b/src/sql/returnJobs.ts new file mode 100644 index 00000000..988c5f64 --- /dev/null +++ b/src/sql/returnJobs.ts @@ -0,0 +1,68 @@ +import { DbJob, WithPgClient } from "../interfaces"; +import { CompiledSharedOptions } from "../lib"; + +export async function returnJobs( + compiledSharedOptions: CompiledSharedOptions, + withPgClient: WithPgClient, + poolId: string, + jobs: ReadonlyArray, +): Promise { + const { + escapedWorkerSchema, + workerSchema, + resolvedPreset: { + worker: { preparedStatements }, + }, + } = compiledSharedOptions; + + const jobsWithQueues: DbJob[] = []; + const jobsWithoutQueues: DbJob[] = []; + + for (const job of jobs) { + if (job.job_queue_id != null) { + jobsWithQueues.push(job); + } else { + jobsWithoutQueues.push(job); + } + } + + if (jobsWithQueues.length > 0) { + await withPgClient((client) => + client.query({ + text: `\ +with j as ( +update ${escapedWorkerSchema}._private_jobs as jobs +set +attempts = GREATEST(0, attempts - 1), +locked_by = null, +locked_at = null +where id = ANY($2::bigint[]) +and locked_by = $1::text +returning * +) +update ${escapedWorkerSchema}._private_job_queues as job_queues +set locked_by = null, locked_at = null +from j +where job_queues.id = j.job_queue_id and job_queues.locked_by = $1::text;`, + values: [poolId, jobsWithQueues.map((job) => job.id)], + name: !preparedStatements ? undefined : `return_job_q/${workerSchema}`, + }), + ); + } + if (jobsWithoutQueues.length > 0) { + await withPgClient((client) => + client.query({ + text: `\ +update ${escapedWorkerSchema}._private_jobs as jobs +set +attempts = GREATEST(0, attempts - 1), +locked_by = null, +locked_at = null +where id = ANY($2::bigint[]) +and locked_by = $1::text;`, + values: [poolId, jobsWithoutQueues.map((job) => job.id)], + name: !preparedStatements ? undefined : `return_job/${workerSchema}`, + }), + ); + } +} diff --git a/src/version.ts b/src/version.ts index 8756a0f5..989fa0d2 100644 --- a/src/version.ts +++ b/src/version.ts @@ -1,2 +1,2 @@ // This file is autogenerated by /scripts/postversion.mjs -export const version = "0.16.6"; +export const version = "0.17.0-canary.1fcb2a0"; diff --git a/src/worker.ts b/src/worker.ts index cda0837f..5a3fad39 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -4,7 +4,9 @@ import { randomBytes } from "crypto"; import deferred from "./deferred"; import { makeJobHelpers } from "./helpers"; import { + CompleteJobFunction, EnhancedWithPgClient, + FailJobFunction, GetJobFunction, Job, PromiseOrDirect, @@ -14,8 +16,6 @@ import { WorkerSharedOptions, } from "./interfaces"; import { coerceError, CompiledSharedOptions } from "./lib"; -import { completeJob } from "./sql/completeJob"; -import { failJob } from "./sql/failJob"; const NO_LOG_SUCCESS = !!process.env.NO_LOG_SUCCESS; @@ -31,8 +31,11 @@ export function makeNewWorker( autostart?: boolean; workerId?: string; getJob: GetJobFunction; + completeJob: CompleteJobFunction; + failJob: FailJobFunction; }, ): Worker { + const ctx = compiledSharedOptions; const { tasks, withPgClient, @@ -43,6 +46,8 @@ export function makeNewWorker( autostart = true, workerId = `worker-${randomBytes(9).toString("hex")}`, getJob, + completeJob, + failJob, } = params; const { events, @@ -62,16 +67,18 @@ export function makeNewWorker( const promise: Promise & { /** @internal */ worker?: Worker; - } = workerDeferred.finally(() => { - return hooks.process("stopWorker", { worker, withPgClient }); - }); + } = workerDeferred + .finally(() => { + return hooks.process("stopWorker", { worker, withPgClient }); + }) + .catch(noop); promise.then( () => { - events.emit("worker:stop", { worker }); + events.emit("worker:stop", { ctx, worker }); }, (error) => { - events.emit("worker:stop", { worker, error }); + events.emit("worker:stop", { ctx, worker, error }); }, ); let activeJob: Job | null = null; @@ -90,7 +97,7 @@ export function makeNewWorker( const release = (force = false) => { if (active) { active = false; - events.emit("worker:release", { worker }); + events.emit("worker:release", { ctx, worker }); if (cancelDoNext()) { workerDeferred.resolve(); @@ -132,7 +139,7 @@ export function makeNewWorker( }, }; - events.emit("worker:create", { worker, tasks }); + events.emit("worker:create", { ctx, worker, tasks }); logger.debug(`Spawned`); @@ -172,7 +179,7 @@ export function makeNewWorker( flagsToSkip = event.flagsToSkip; } - events.emit("worker:getJob:start", { worker }); + events.emit("worker:getJob:start", { ctx, worker }); const jobRow = await getJob(workerPool.id, flagsToSkip); // `doNext` cannot be executed concurrently, so we know this is safe. @@ -180,13 +187,13 @@ export function makeNewWorker( activeJob = jobRow && jobRow.id ? jobRow : null; if (activeJob) { - events.emit("job:start", { worker, job: activeJob }); + events.emit("job:start", { ctx, worker, job: activeJob }); } else { - events.emit("worker:getJob:empty", { worker }); + events.emit("worker:getJob:empty", { ctx, worker }); } } catch (rawErr) { const err = coerceError(rawErr); - events.emit("worker:getJob:error", { worker, error: err }); + events.emit("worker:getJob:error", { ctx, worker, error: err }); if (continuous) { contiguousErrors++; logger.debug( @@ -294,6 +301,7 @@ export function makeNewWorker( if (err) { try { events.emit("job:error", { + ctx, worker, job, error: err, @@ -309,6 +317,7 @@ export function makeNewWorker( try { // Failed forever events.emit("job:failed", { + ctx, worker, job, error: err, @@ -342,20 +351,18 @@ export function makeNewWorker( }`, { failure: true, job, error: err, duration }, ); - await failJob( - compiledSharedOptions, - withPgClient, - workerPool.id, + failJob({ job, message, // "Batch jobs": copy through only the unsuccessful parts of the payload - batchJobFailedPayloads.length > 0 - ? batchJobFailedPayloads - : undefined, - ); + replacementPayload: + batchJobFailedPayloads.length > 0 + ? batchJobFailedPayloads + : undefined, + }); } else { try { - events.emit("job:success", { worker, job }); + events.emit("job:success", { ctx, worker, job }); } catch (e) { logger.error( "Error occurred in event emitter for 'job:success'; this is an issue in your application code and you should fix it", @@ -374,17 +381,13 @@ export function makeNewWorker( ); } - await completeJob( - compiledSharedOptions, - withPgClient, - workerPool.id, - job, - ); + completeJob(job); } - events.emit("job:complete", { worker, job, error: err }); + events.emit("job:complete", { ctx, worker, job, error: err }); } catch (fatalError) { try { events.emit("worker:fatalError", { + ctx, worker, error: fatalError, jobError: err, @@ -426,3 +429,5 @@ export function makeNewWorker( return worker; } + +function noop() {} diff --git a/towerDefence/README.md b/towerDefence/README.md new file mode 100644 index 00000000..f1919514 --- /dev/null +++ b/towerDefence/README.md @@ -0,0 +1,15 @@ +# Tower defence test + +With the advanced options like localQueue and refetchDelay, Graphile Worker gets +quite complex and testing it becomes a challenge. When there's enough work to go +around (as in `perfTest`), testing is easy and the system handles admirably. But +things become more complex when there's not enough work to go around: we still +want to execute jobs quickly, but we don't want all 10 Graphile Worker instances +sending a query to the DB each time a new job comes in. + +This folder mounts a "tower defence"-style attack against a cluster of Graphile +Worker instances; it's designed to a) make sure no bugs happen, and b) let us +monitor system metrics under various load conditions. We start with the setup +phase where we build our towers (Graphile Worker instances) and then we send +different "waves" of jobs at the towers to ensure everything continues to work +smoothly. diff --git a/towerDefence/crontab b/towerDefence/crontab new file mode 100644 index 00000000..e69de29b diff --git a/towerDefence/graphile.config.mjs b/towerDefence/graphile.config.mjs new file mode 100644 index 00000000..e3bd3dbe --- /dev/null +++ b/towerDefence/graphile.config.mjs @@ -0,0 +1,152 @@ +// @ts-check + +/** @typedef {import("../dist/index.js")} Worker */ +// import type {} from "../src/index.js"; + +// import { WorkerProPreset } from "../graphile-pro-worker/dist/index.js"; + +const CONCURRENT_JOBS = 10; +export const PARALLELISM = 10; + +const stats = { + fetches: 0, + emptyFetches: 0, + jobsFetched: 0, + jobsReturned: 0, + timeInMode: Object.create(null), + timeInRefetchDelay: 0n, + refetchDelays: 0, + refetchDelaysAborted: 0, + maxLatency: 0, + latencySum: 0, +}; + +let lastModeStart = process.hrtime.bigint(); +let refetchDelayStart = process.hrtime.bigint(); + +/** @type {(value: number | string, width?: number, char?: string) => string} */ +const p = (v, w = 10, s = " ") => String(v).padStart(w, s); + +/** @type {(t: bigint) => string} */ +const ms = (t) => { + return `${(Number(t) / 1e6).toFixed(2)}ms`; +}; + +/** @type {() => string} */ +const tim = () => { + let results = []; + for (const m in stats.timeInMode) { + results.push(p(`${p(m)}=${ms(stats.timeInMode[m])}`, 19)); + } + return results.join(","); +}; + +/** @type {GraphileConfig.Plugin} */ +const TowerDefenceResultPlugin = { + name: "TowerDefenceResultPlugin", + version: "0.0.0", + worker: { + hooks: { + init(ctx) { + ctx.events.on("pool:release", (event) => { + console.log( + `\nPool ${event.workerPool.id} released\nFetches=${p( + stats.fetches, + 5, + )}(empty=${p(stats.emptyFetches, 5)};maxLatency=${p( + stats.maxLatency, + 4, + )}ms;avgLatency=${p( + stats.jobsFetched + ? (stats.latencySum / stats.jobsFetched).toFixed(2) + : "-", + 8, + )}ms)|Fetched=${p(stats.jobsFetched, 6)}|Returned=${p( + stats.jobsReturned, + 6, + )}|TotalDelay=${p(ms(stats.timeInRefetchDelay), 11)}(Aborted=${p( + `${stats.refetchDelaysAborted}/${stats.refetchDelays}`, + 9, + )})|${tim()}\n`, + ); + }); + ctx.events.on("localQueue:getJobs:complete", ({ jobs }) => { + stats.fetches += 1; + if (jobs.length === 0) { + stats.emptyFetches += 1; + } + stats.jobsFetched += jobs.length; + }); + ctx.events.on("localQueue:returnJobs", ({ jobs }) => { + stats.jobsReturned += jobs.length; + }); + ctx.events.on("localQueue:init", () => { + lastModeStart = process.hrtime.bigint(); + }); + ctx.events.on("localQueue:setMode", ({ oldMode, newMode }) => { + const now = process.hrtime.bigint(); + const diff = now - lastModeStart; + lastModeStart = now; + stats.timeInMode[oldMode] ??= 0n; + stats.timeInMode[oldMode] += diff; + }); + ctx.events.on("localQueue:refetchDelay:start", () => { + stats.refetchDelays += 1; + refetchDelayStart = process.hrtime.bigint(); + }); + ctx.events.on("localQueue:refetchDelay:abort", () => { + stats.refetchDelaysAborted += 1; + const elapsed = process.hrtime.bigint() - refetchDelayStart; + stats.timeInRefetchDelay += elapsed; + }); + ctx.events.on("localQueue:refetchDelay:expired", () => { + const elapsed = process.hrtime.bigint() - refetchDelayStart; + stats.timeInRefetchDelay += elapsed; + }); + ctx.events.on("job:start", (event) => { + const l = Date.now() - +event.job.run_at; + stats.latencySum += l; + if (l > stats.maxLatency) { + stats.maxLatency = l; + } + }); + }, + }, + }, +}; + +const localQueueSize = CONCURRENT_JOBS + 1; + +/** @type {GraphileConfig.Preset} */ +const preset = { + // extends: [WorkerProPreset], + worker: { + connectionString: + process.env.PERF_DATABASE_URL || "postgres:///graphile_worker_perftest", + fileExtensions: [".js", ".cjs", ".mjs"], + // fileExtensions: [".js", ".cjs", ".mjs", ".ts", ".cts", ".mts"], + gracefulShutdownAbortTimeout: 2500, + + concurrentJobs: CONCURRENT_JOBS, + maxPoolSize: CONCURRENT_JOBS + 1, + + //localQueue: { size: -1 }, + //completeJobBatchDelay: -1, + //failJobBatchDelay: -1, + + pollInterval: 2000, + localQueue: { + size: localQueueSize, + refetchDelay: { + durationMs: 1000, + threshold: localQueueSize - 1, + maxAbortThreshold: CONCURRENT_JOBS * PARALLELISM, + }, + }, + completeJobBatchDelay: 0, + failJobBatchDelay: 0, + }, + plugins: [TowerDefenceResultPlugin], +}; + +export default preset; diff --git a/towerDefence/run.mjs b/towerDefence/run.mjs new file mode 100755 index 00000000..02453ca2 --- /dev/null +++ b/towerDefence/run.mjs @@ -0,0 +1,228 @@ +#!/usr/bin/env node +// @ts-check +import { execSync, spawn } from "child_process"; +import pg from "pg"; + +import { makeWorkerUtils } from "../dist/index.js"; +import config, { PARALLELISM } from "./graphile.config.mjs"; + +const CONCURRENCY = config.worker?.concurrentJobs ?? 1; +/** How long each individual task sleeps for */ +const SLEEP_TIME = 50; + +const STUCK_JOB_COUNT = 0; +const WAVES = [ + makeWave([1]), + makeWave(new Array(1000).fill(1), 10), + makeWave(new Array(1000).fill(1), 5), + makeWave(new Array(3000).fill(1), 1), + makeWave(new Array(5000).fill(1)), + makeWave(new Array(5000).fill(4)), + makeWave(new Array(200).fill(200)), + makeWave(Array.from({ length: 50 }, repeat([2000, 200, 20, 2])), 5), +]; + +/** @type {(arr: T[]) => (_: any, i: number) => T} */ +function repeat(arr) { + return (_, i) => arr[i % arr.length]; +} + +const taskIdentifier = "log_if_999"; + +const __dirname = new URL(".", import.meta.url).pathname; + +// run in this script's parent directory +process.chdir(__dirname); + +process.env.NO_LOG_SUCCESS = "1"; + +// if connection string not provided, assume postgres is available locally +process.env.PERF_DATABASE_URL ??= `${ + process.env.TEST_CONNECTION_STRING || "postgres:///graphile_worker_perftest" +}`; + +const env = { + ...process.env, + DATABASE_URL: process.env.PERF_DATABASE_URL, +}; + +/** @type {import("child_process").CommonExecOptions} */ +const execOptions = { + env, + stdio: ["ignore", "ignore", "inherit"], +}; + +/** @type {import("child_process").SpawnOptions} */ +const spawnOptions = { + env, + stdio: ["ignore", "inherit", "inherit"], + detached: false, +}; + +const pgPool = new pg.Pool({ connectionString: process.env.PERF_DATABASE_URL }); +pgPool.on("error", () => {}); +pgPool.on("connect", (client) => void client.on("error", () => {})); + +//const GENERAL_JOBS_PER_SECOND = 15000; +const GENERAL_JOBS_PER_SECOND = Math.min( + 15000, + CONCURRENCY * PARALLELISM * (1000 / (SLEEP_TIME + 0.1)), +); +const GENERAL_JOBS_PER_MILLISECOND = GENERAL_JOBS_PER_SECOND / 1000; + +/** @type {(jobBatches: number[], sleepDuration?: number) => (workerUtils: import("../dist/interfaces.js").WorkerUtils) => Promise} */ +function makeWave(jobBatches, extraSleepDuration = -1) { + return async (workerUtils) => { + let totalCount = 0; + let start = Date.now(); + for (let i = 0; i < jobBatches.length; i++) { + const NOW = new Date(); + const jobCount = jobBatches[i]; + /** @type {import("../dist/index.js").AddJobsJobSpec[]} */ + const jobs = []; + for (let i = 0; i < jobCount; i++) { + totalCount++; + jobs.push({ + identifier: taskIdentifier, + payload: { + id: i, + sleepTime: SLEEP_TIME, + }, + runAt: NOW, + }); + } + await workerUtils.addJobs(jobs); + const sleepDuration = + Math.floor((jobCount * SLEEP_TIME) / (CONCURRENCY * PARALLELISM)) + + extraSleepDuration; + if (sleepDuration >= 0) { + await sleep(sleepDuration); + } + } + + // Give roughly enough time for the jobs to complete + const estimatedExecutionTime = totalCount / GENERAL_JOBS_PER_MILLISECOND; + + const elapsed = Date.now() - start; + const timeToSleep = estimatedExecutionTime - elapsed; + if (timeToSleep > 0) { + await sleep(timeToSleep); + } + + // And wait for the jobs table to be empty + const MAX_ATTEMPTS = 20; + for (let attempts = 0; attempts < MAX_ATTEMPTS; attempts++) { + const { + rows: [{ count }], + } = await pgPool.query( + `select count(*) from graphile_worker.jobs where task_identifier <> 'stuck';`, + ); + if (count === "0") { + break; + } + if (attempts === MAX_ATTEMPTS - 1) { + throw new Error(`Expected 0 jobs, got ${count}`); + } else { + await sleep(50 * (attempts + 1) ** 1.5); + } + } + }; +} + +/** @type {(ms: number) => Promise} */ +const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); + +/** @type {(cb: () => any) => Promise} */ +const time = async (cb) => { + const start = process.hrtime(); + await cb(); + const diff = process.hrtime(start); + const dur = diff[0] * 1e3 + diff[1] * 1e-6; + console.log(`... it took ${dur.toFixed(0)}ms`); + return dur; +}; + +async function main() { + console.log("Building"); + execSync("yarn prepack", execOptions); + + console.log("Dropping and recreating the test database"); + execSync("node ../perfTest/recreateDb.js", execOptions); + + console.log("Installing the schema"); + execSync("node ../dist/cli.js --schema-only", execOptions); + + const workerUtils = await makeWorkerUtils({ + pgPool, + }); + + if (STUCK_JOB_COUNT > 0) { + console.log(`Scheduling ${STUCK_JOB_COUNT} stuck jobs`); + await time(() => { + execSync( + `node ../perfTest/init.js ${STUCK_JOB_COUNT} stuck`, + execOptions, + ); + }); + } + + console.log(); + console.log(); + console.log(`Spawning ${PARALLELISM} workers...`); + /** @type {import("child_process").PromiseWithChild[]} */ + const workerPromises = []; + for (let i = 0; i < PARALLELISM; i++) { + const child = spawn(`node`, [`../dist/cli.js`], spawnOptions); + const promise = Object.assign( + new Promise((resolve, reject) => { + child.on("error", reject); + child.on("exit", resolve); + }), + { child }, + ); + workerPromises.push(promise); + } + + const allDone = Promise.all(workerPromises).then( + () => { + console.log("All workers exited cleanly"); + }, + (e) => { + /** @type {import("child_process").ExecException} */ + const err = e; + if (err.signal === "SIGTERM") { + // all good; we terminated it + } else { + console.dir(err); + process.exit(1); + } + }, + ); + + await sleep(2000); + console.log("The wait is over... starting the attack"); + console.log(); + console.log(); + + for (let waveNumber = 0; waveNumber < WAVES.length; waveNumber++) { + const wave = WAVES[waveNumber]; + console.log(`Wave ${waveNumber + 1}...`); + await wave(workerUtils); + console.log(); + console.log(); + } + + console.log("Waves complete; waiting for workers to finish"); + for (const { child } of workerPromises) { + child.kill("SIGTERM"); + } + + await allDone; + + console.log("Exiting"); +} + +main().catch((e) => { + console.error(e); + process.exit(1); +}); diff --git a/towerDefence/tasks/log_if_999.js b/towerDefence/tasks/log_if_999.js new file mode 100644 index 00000000..bcda790d --- /dev/null +++ b/towerDefence/tasks/log_if_999.js @@ -0,0 +1,9 @@ +const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); +module.exports = ({ id, sleepTime }) => { + if (id === 999) { + console.log("Found 999!"); + } + if (sleepTime) { + return sleep(sleepTime); + } +}; diff --git a/website/docs/performance.md b/website/docs/performance.md index e4f824db..8e8ed7f3 100644 --- a/website/docs/performance.md +++ b/website/docs/performance.md @@ -3,72 +3,183 @@ title: "Performance" sidebar_position: 120 --- +## Quick stats + +Quick stats in optimial conditions: + +- jobs executed per second: ~183,000 +- average latency from add_job to job execution start: 4.16ms (max: 13.84ms) +- jobs queued per second from single add_jobs batch call: ~202,000 +- time to start and immediately shut down the worker: 68ms + +The above stats were achieved with this configuration: + +```ts +const preset = { + worker: { + connectionString: "postgres:///graphile_worker_perftest", + fileExtensions: [".js", ".cjs", ".mjs"], + + concurrentJobs: 24, + maxPoolSize: 25, + + // Batching options (see below) + localQueue: { size: 500 }, + completeJobBatchDelay: 0, + failJobBatchDelay: 0, + }, +}; +``` + +## Performance statement + `graphile-worker` is not intended to replace extremely high performance -dedicated job queues, it's intended to be a very easy way to get a -reasonably performant job queue up and running with Node.js and PostgreSQL. But -this doesn't mean it's a slouch by any means — it achieves an -average latency from triggering a job in one process to executing it in another -of under 3ms, and a 12-core database server can queue around 99,600 jobs per -second and can process around 11,800 jobs per second. +dedicated job queues for Facebook scale, it's intended to give regular +organizations the fastest and easiest to set up job queue we can achieve without +needing to expand your infrastructure beyond Node.js and PostgreSQL. But this +doesn't mean it's a slouch by any means — it achieves an average +latency from triggering a job in one process to executing it in another of under +5ms, and a well-specced database server can queue around 172,000 jobs per second +from a single PostgreSQL client, and can process around 196k jobs per second +using a pool of 4 Graphile Worker instances, each with concurrency set to 24. +For many organizations, this is more than they'll ever need. + +## Horizontal scaling `graphile-worker` is horizontally scalable to a point. Each instance has a customizable worker pool, this pool defaults to size 1 (only one job at a time on this worker) but depending on the nature of your tasks (i.e. assuming they're not compute-heavy) you will likely want to set this higher to benefit from Node.js' concurrency. If your tasks are compute heavy you may -still wish to set it higher and then using Node's `child_process` (or Node -v11's `worker_threads`) to share the compute load over multiple cores -without significantly impacting the main worker's run loop. Note, however, -that Graphile Worker is limited by the performance of the underlying Postgres +still wish to set it higher and then using Node's `child_process` or +`worker_threads` to share the compute load over multiple cores without +significantly impacting the main worker's run loop. + +## Enabling batching for highest performance + +Graphile Worker is limited by the performance of the underlying Postgres database, and when you hit this limit performance will start to go down (rather than up) as you add more workers. -To test performance, you can run `yarn perfTest`. This runs three tests: +To mitigate this, we've added batching functionality to many of the internal +methods which you can enable via the configuration. For example using a local +queue enables each pool to pull down a configurable number of jobs up front so +its workers can start a new job the moment their previous one completes without +having to request a new job from the database. This batching also reduces load +on the database since there are fewer total queries per second, but it's a +slight trade-off since more jobs are checked out but not necessarily actively +being worked on, so latency may increase and in the event of a crash more jobs +will be locked. + +## Running the performance tests + +To test performance, you can check out the repository and then run +`yarn perfTest`. This runs three tests: 1. a startup/shutdown test to see how fast the worker can startup and exit if there's no jobs queued (this includes connecting to the database and ensuring the migrations are up to date) -2. a load test — by default this will run 20,000 +2. a load test — by default this will run 200,000 [trivial](https://github.com/graphile/worker/blob/main/perfTest/tasks/log_if_999.js) - jobs with a parallelism of 4 (i.e. 4 node processes) and a concurrency of 10 - (i.e. 10 concurrent jobs running on each node process), but you can configure - this in `perfTest/run.js`. (These settings were optimized for a 12-core - hyper-threading machine running both the tests and the database locally.) + jobs with a parallelism of 4 (i.e. 4 node processes) and a concurrency of 24 + (i.e. 24 concurrent jobs running on each node process), but you can configure + this in `perfTest/run.js`. (These settings were optimized for a Intel + i9-14900K with efficiency cores disabled and running both the tests and the + database locally.) 3. a latency test — determining how long between issuing an `add_job` command and the task itself being executed. ## perfTest results: -The test was ran on a 12-core AMD Ryzen 3900 with an M.2 SSD, running both the -workers and the database (and a tonne of Chrome tabs, electron apps, and what -not). Jobs=20000, parallelism=4, concurrency=10. +Executed on +[this machine](https://uk.pcpartpicker.com/user/BenjieGillam/saved/#view=BjtCrH), +running both the workers and the database (and a tonne of Chrome tabs, electron +apps, and what not). + +### With batching + +**Jobs per second: ~184,000** -Conclusion: +```ts +const preset = { + worker: { + connectionString: "postgres:///graphile_worker_perftest", + fileExtensions: [".js", ".cjs", ".mjs"], -- Startup/shutdown: 110ms -- Jobs per second: 11,851 -- Average latency: 2.66ms (min: 2.39ms, max: 12.09ms) + concurrentJobs: 24, + maxPoolSize: 25, + + // Batching options (see below) + localQueue: { size: 500 }, + completeJobBatchDelay: 0, + failJobBatchDelay: 0, + }, +}; +``` ``` Timing startup/shutdown time... -... it took 110ms +... it took 68ms + +Scheduling 200000 jobs +Adding jobs: 988.425ms +... it took 1160ms + + +Timing 200000 job execution... +Found 999! + +... it took 1156ms +Jobs per second: 183895.49 + +Testing latency... +[core] INFO: Worker connected and looking for jobs... (task names: 'latency') +Beginning latency test +Latencies - min: 3.24ms, max: 18.18ms, avg: 4.28ms +``` + +### Without batching + +**Jobs per second: ~15,600** + +```ts +const preset = { + worker: { + connectionString: "postgres:///graphile_worker_perftest", + fileExtensions: [".js", ".cjs", ".mjs"], + + concurrentJobs: 24, + maxPoolSize: 25, + + // Batching disabled (default) + localQueue: { size: -1 }, + completeJobBatchDelay: -1, + failJobBatchDelay: -1, + }, +}; +``` + +``` +Timing startup/shutdown time... +... it took 77ms + -Scheduling 20000 jobs -Adding jobs: 200.84ms -... it took 287ms +Scheduling 200000 jobs +Adding jobs: 992.368ms +... it took 1163ms -Timing 20000 job execution... +Timing 200000 job execution... Found 999! -... it took 1797ms -Jobs per second: 11851.90 +... it took 12892ms +Jobs per second: 15606.79 Testing latency... [core] INFO: Worker connected and looking for jobs... (task names: 'latency') Beginning latency test -Latencies - min: 2.39ms, max: 12.09ms, avg: 2.66ms +Latencies - min: 3.40ms, max: 14.13ms, avg: 4.47ms ``` TODO: post perfTest results in a more reasonable configuration, e.g. using an diff --git a/yarn.lock b/yarn.lock index d81fd84d..adf1323a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6355,10 +6355,10 @@ graphemer@^1.4.0: resolved "https://registry.yarnpkg.com/graphemer/-/graphemer-1.4.0.tgz#fb2f1d55e0e3a1849aeffc90c4fa0dd53a0e66c6" integrity sha512-EtKwoO6kxCL9WO5xipiHTZlSzBm7WLT627TqC/uVRd0HKmq8NXyebnNYxDoBi7wt8eTWrUrKXCOVaFq9x1kgag== -graphile-config@^0.0.1-beta.4, graphile-config@^0.0.1-beta.11: - version "0.0.1-beta.11" - resolved "https://registry.yarnpkg.com/graphile-config/-/graphile-config-0.0.1-beta.11.tgz#4bd2ffd1fee6834f2e5dedc64016e7a3a9eda151" - integrity sha512-+2QLPpihQQvSYd6sSXcDrwHMMSygUrK41qWhak7u3vsXj2AGwVwl+kVvlBwuoovaoUPDsGF8zy5IevTAMgzg5Q== +graphile-config@^0.0.1-beta.14, graphile-config@^0.0.1-beta.4: + version "0.0.1-beta.14" + resolved "https://registry.yarnpkg.com/graphile-config/-/graphile-config-0.0.1-beta.14.tgz#6238ad5960ccc20b19718726da7c3b1a6c48d831" + integrity sha512-3FlhyRKz4LvIbY4AXn4EI8DSTdSYsg0WRfX6U9QeytGta9aiefF1QqSiC1ocXUlNJUMBfm28dy0eL669ljYRwg== dependencies: "@types/interpret" "^1.1.1" "@types/node" "^20.5.7"