Skip to content

Commit 3e87e12

Browse files
authored
Extract various minor enhancements from #474 (#513)
2 parents 7b31a89 + cef1de1 commit 3e87e12

17 files changed

+166
-82
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ node_modules/
88
/tasks/
99
/rewired/
1010
_LOCAL/
11+
/graphile-pro-worker

__tests__/migrate.test.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import {
1414

1515
const options: WorkerSharedOptions = {};
1616

17+
const MAX_MIGRATION_NUMBER = 18;
18+
1719
test("migration installs schema; second migration does no harm", async () => {
1820
await withPgClient(async (pgClient) => {
1921
await pgClient.query(
@@ -40,7 +42,7 @@ test("migration installs schema; second migration does no harm", async () => {
4042
const { rows: migrationRows } = await pgClient.query(
4143
`select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations order by id asc`,
4244
);
43-
expect(migrationRows).toHaveLength(18);
45+
expect(migrationRows).toHaveLength(MAX_MIGRATION_NUMBER);
4446
const migration = migrationRows[0];
4547
expect(migration.id).toEqual(1);
4648

@@ -90,16 +92,17 @@ test("multiple concurrent installs of the schema is fine", async () => {
9092
);
9193
}
9294
} finally {
93-
await Promise.allSettled(promises);
95+
const results = await Promise.allSettled(promises);
9496
await Promise.allSettled(clients.map((c) => c.release()));
97+
expect(results.every((r) => r.status === "fulfilled")).toBeTruthy();
9598
}
9699
});
97100
await withPgClient(async (pgClient) => {
98101
// Assert migrations table exists and has relevant entries
99102
const { rows: migrationRows } = await pgClient.query(
100103
`select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations order by id asc`,
101104
);
102-
expect(migrationRows).toHaveLength(18);
105+
expect(migrationRows).toHaveLength(MAX_MIGRATION_NUMBER);
103106
const migration = migrationRows[0];
104107
expect(migration.id).toEqual(1);
105108

@@ -147,7 +150,7 @@ insert into ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations (id) values (1);
147150
const { rows: migrationRows } = await pgClient.query(
148151
`select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations order by id asc`,
149152
);
150-
expect(migrationRows.length).toBeGreaterThanOrEqual(18);
153+
expect(migrationRows.length).toBeGreaterThanOrEqual(MAX_MIGRATION_NUMBER);
151154
const migration2 = migrationRows[1];
152155
expect(migration2.id).toEqual(2);
153156
expect(migration2.breaking).toEqual(false);
@@ -208,7 +211,7 @@ test("aborts if database is more up to date than current worker", async () => {
208211
await expect(
209212
migrate(compiledSharedOptions, pgClient),
210213
).rejects.toThrowErrorMatchingInlineSnapshot(
211-
`"Database is using Graphile Worker schema revision 999999 which includes breaking migration 999999, but the currently running worker only supports up to revision 18. It would be unsafe to continue; please ensure all versions of Graphile Worker are compatible."`,
214+
`"Database is using Graphile Worker schema revision 999999 which includes breaking migration 999999, but the currently running worker only supports up to revision ${MAX_MIGRATION_NUMBER}. It would be unsafe to continue; please ensure all versions of Graphile Worker are compatible."`,
212215
);
213216
});
214217
});

__tests__/runner.helpers.getTaskName.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ beforeAll(() => {
1818
connectionString: databaseDetails!.TEST_CONNECTION_STRING,
1919
max: JOB_COUNT * 2 + 5,
2020
});
21+
pgPool.on("error", () => {});
22+
pgPool.on("connect", () => {});
2123
});
2224
afterAll(() => {
2325
pgPool.end();

__tests__/runner.runOnce.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { Pool } from "pg";
22

33
import { makeWorkerPresetWorkerOptions } from "../src/config";
44
import { Job, RunnerOptions, WorkerUtils } from "../src/interfaces";
5+
import { coerceError } from "../src/lib";
56
import { _allWorkerPools } from "../src/main";
67
import { WorkerPreset } from "../src/preset";
78
import { runOnce } from "../src/runner";
@@ -55,7 +56,8 @@ async function runOnceErrorAssertion(
5556
expect.assertions(1);
5657
try {
5758
await runOnce(options);
58-
} catch (e) {
59+
} catch (rawE) {
60+
const e = coerceError(rawE);
5961
expect(e.message).toMatch(message);
6062
}
6163
}

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,11 @@
4949
"homepage": "https://github.com/graphile/worker#readme",
5050
"dependencies": {
5151
"@graphile/logger": "^0.2.0",
52+
"@tsconfig/node18": "^18.2.4",
5253
"@types/debug": "^4.1.10",
5354
"@types/pg": "^8.10.5",
5455
"cosmiconfig": "^8.3.6",
55-
"graphile-config": "^0.0.1-beta.4",
56+
"graphile-config": "^0.0.1-beta.11",
5657
"json5": "^2.2.3",
5758
"pg": "^8.11.3",
5859
"tslib": "^2.6.2",

src/cron.ts

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@ import {
1616
TimestampDigest,
1717
WorkerEvents,
1818
} from "./interfaces";
19-
import { CompiledOptions, CompiledSharedOptions, Releasers } from "./lib";
19+
import {
20+
coerceError,
21+
CompiledOptions,
22+
CompiledSharedOptions,
23+
Releasers,
24+
} from "./lib";
2025

2126
interface CronRequirements {
2227
pgPool: Pool;
@@ -333,19 +338,19 @@ export const runCron = (
333338
}
334339

335340
const start = new Date();
336-
events.emit("cron:starting", { cron: this, start });
341+
events.emit("cron:starting", { cron, start });
337342

338343
// We must backfill BEFORE scheduling any new jobs otherwise backfill won't
339344
// work due to known_crontabs.last_execution having been updated.
340345
await registerAndBackfillItems(
341-
{ pgPool, events, cron: this },
346+
{ pgPool, events, cron },
342347
escapedWorkerSchema,
343348
parsedCronItems,
344349
new Date(+start),
345350
useNodeTime,
346351
);
347352

348-
events.emit("cron:started", { cron: this, start });
353+
events.emit("cron:started", { cron, start });
349354

350355
if (!cron._active) {
351356
return stop();
@@ -406,7 +411,7 @@ export const runCron = (
406411
},
407412
);
408413
events.emit("cron:prematureTimer", {
409-
cron: this,
414+
cron,
410415
currentTimestamp,
411416
expectedTimestamp,
412417
});
@@ -422,7 +427,7 @@ export const runCron = (
422427
)}s behind)`,
423428
);
424429
events.emit("cron:overdueTimer", {
425-
cron: this,
430+
cron,
426431
currentTimestamp,
427432
expectedTimestamp,
428433
});
@@ -444,7 +449,7 @@ export const runCron = (
444449
// Finally actually run the jobs.
445450
if (jobsAndIdentifiers.length) {
446451
events.emit("cron:schedule", {
447-
cron: this,
452+
cron,
448453
timestamp: expectedTimestamp,
449454
jobsAndIdentifiers,
450455
});
@@ -456,7 +461,7 @@ export const runCron = (
456461
useNodeTime,
457462
);
458463
events.emit("cron:scheduled", {
459-
cron: this,
464+
cron,
460465
timestamp: expectedTimestamp,
461466
jobsAndIdentifiers,
462467
});
@@ -475,7 +480,7 @@ export const runCron = (
475480
} catch (e) {
476481
// If something goes wrong; abort. The calling code should re-schedule
477482
// which will re-trigger the backfilling code.
478-
return stop(e);
483+
return stop(coerceError(e));
479484
}
480485
}
481486

src/crontab.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import {
2121
CronItemOptions,
2222
ParsedCronItem,
2323
} from "./interfaces";
24+
import { coerceError } from "./lib";
2425

2526
/**
2627
* Returns a period of time in milliseconds representing the time phrase given.
@@ -179,7 +180,9 @@ const parseCrontabPayload = (
179180
return JSON5.parse(payloadString);
180181
} catch (e) {
181182
throw new Error(
182-
`Failed to parse JSON5 payload on line ${lineNumber} of crontab: ${e.message}`,
183+
`Failed to parse JSON5 payload on line ${lineNumber} of crontab: ${
184+
coerceError(e).message
185+
}`,
183186
);
184187
}
185188
};

src/deferred.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ export interface Deferred<T = void> extends Promise<T> {
44
}
55

66
export default function defer<T = void>(): Deferred<T> {
7-
let resolve: (result?: T | PromiseLike<T>) => void;
8-
let reject: (error: Error) => void;
7+
let resolve: Deferred<T>["resolve"];
8+
let reject: Deferred<T>["reject"];
99
return Object.assign(
1010
new Promise<T>((_resolve, _reject) => {
11-
resolve = _resolve;
11+
resolve = _resolve as Deferred<T>["resolve"];
1212
reject = _reject;
1313
}),
1414
// @ts-ignore error TS2454: Variable 'resolve' is used before being assigned.

src/interfaces.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -908,7 +908,6 @@ export type WorkerEventMap = {
908908
"pool:listen:error": {
909909
workerPool: WorkerPool;
910910
error: unknown;
911-
client: PoolClient;
912911
};
913912

914913
/**

src/lib.ts

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,17 @@ export async function assertPool(
300300
pgPool = _rawOptions.pgPool;
301301
if (pgPool.listeners("error").length === 0) {
302302
console.warn(
303-
`Your pool doesn't have error handlers! See: https://err.red/wpeh`,
303+
`Your pool doesn't have error handlers! See: https://err.red/wpeh?v=${encodeURIComponent(
304+
version,
305+
)}`,
306+
);
307+
installErrorHandlers(compiledSharedOptions, releasers, pgPool);
308+
}
309+
if (pgPool.listeners("connect").length === 0) {
310+
console.warn(
311+
`Your pool doesn't have all of the error handlers! See: https://err.red/wpeh?v=${encodeURIComponent(
312+
version,
313+
)}&method=connect`,
304314
);
305315
installErrorHandlers(compiledSharedOptions, releasers, pgPool);
306316
}
@@ -400,7 +410,7 @@ export async function withReleasers<T>(
400410
try {
401411
await releasers[i]();
402412
} catch (e) {
403-
firstError = firstError || e;
413+
firstError ??= coerceError(e);
404414
}
405415
}
406416
if (firstError) {
@@ -530,7 +540,8 @@ export function makeEnhancedWithPgClient(
530540
for (let attempts = 0; attempts < MAX_RETRIES; attempts++) {
531541
try {
532542
return await withPgClient(...args);
533-
} catch (e) {
543+
} catch (rawE) {
544+
const e = coerceError(rawE);
534545
const retryable = RETRYABLE_ERROR_CODES.find(
535546
({ code }) => code === e.code,
536547
);
@@ -551,3 +562,20 @@ export function makeEnhancedWithPgClient(
551562

552563
export const sleep = (ms: number) =>
553564
new Promise<void>((resolve) => setTimeout(resolve, ms));
565+
566+
export function coerceError(err: unknown): Error & { code?: unknown } {
567+
if (err instanceof Error) {
568+
return err;
569+
} else {
570+
const message =
571+
typeof err === "object" && err !== null && "message" in err
572+
? String(err.message)
573+
: "An error occurred";
574+
return new Error(message, { cause: err });
575+
}
576+
}
577+
578+
export function isPromiseLike<T>(v: PromiseLike<T> | T): v is PromiseLike<T> {
579+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
580+
return v != null && typeof (v as any).then === "function";
581+
}

0 commit comments

Comments
 (0)