Skip to content

Commit 7b31a89

Browse files
authored
Add abortPromise as a helper alongside abortSignal (#512)
2 parents 4d71970 + 49b7266 commit 7b31a89

File tree

12 files changed

+226
-35
lines changed

12 files changed

+226
-35
lines changed

RELEASE_NOTES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ Read more:
3939
output via logging (thanks @wineTGH).
4040
- Fix race condition when multiple workers attempt to initialise the database at
4141
the same time
42+
- `helpers.abortSignal` is no longer typed as `| undefined`. It is still
43+
experimental!
44+
- `helpers.abortPromise` added; will reject when `abortSignal` aborts (useful
45+
for `Promise.race()`)
4246

4347
## v0.16.6
4448

__tests__/getTasks.test.ts

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@ import { makeMockJob, withPgClient } from "./helpers";
1010

1111
const options: WorkerSharedOptions = {};
1212

13+
const neverAbortController = new AbortController();
14+
const abortSignal = neverAbortController.signal;
15+
const abortPromise = new Promise<void>((_, reject) => {
16+
abortSignal.addEventListener("abort", reject);
17+
});
18+
1319
describe("commonjs", () => {
1420
test("gets tasks from folder", () =>
1521
withPgClient(async (client) => {
@@ -32,7 +38,8 @@ Array [
3238
withPgClient: makeEnhancedWithPgClient(
3339
makeWithPgClientFromClient(client),
3440
),
35-
abortSignal: undefined,
41+
abortSignal,
42+
abortPromise,
3643
},
3744
);
3845
expect(await tasks.wouldyoulike!(helpers.job.payload, helpers)).toEqual(
@@ -68,7 +75,8 @@ Array [
6875
withPgClient: makeEnhancedWithPgClient(
6976
makeWithPgClientFromClient(client),
7077
),
71-
abortSignal: undefined,
78+
abortSignal,
79+
abortPromise,
7280
},
7381
);
7482
expect(await tasks.task1!(helpers.job.payload, helpers)).toEqual("hi");
@@ -98,7 +106,8 @@ Array [
98106
withPgClient: makeEnhancedWithPgClient(
99107
makeWithPgClientFromClient(client),
100108
),
101-
abortSignal: undefined,
109+
abortSignal,
110+
abortPromise,
102111
},
103112
);
104113
expect(await tasks.task1!(helpers.job.payload, helpers)).toEqual("hi");
@@ -127,7 +136,8 @@ Array [
127136
withPgClient: makeEnhancedWithPgClient(
128137
makeWithPgClientFromClient(client),
129138
),
130-
abortSignal: undefined,
139+
abortSignal,
140+
abortPromise,
131141
});
132142
expect(await tasks.t1!(helpers.job.payload, helpers)).toEqual(
133143
"come with me",
@@ -157,7 +167,8 @@ Array [
157167
withPgClient: makeEnhancedWithPgClient(
158168
makeWithPgClientFromClient(client),
159169
),
160-
abortSignal: undefined,
170+
abortSignal,
171+
abortPromise,
161172
});
162173
expect(await tasks.t1!(helpers.job.payload, helpers)).toEqual(
163174
"come with me, TS",
@@ -191,7 +202,8 @@ Array [
191202
withPgClient: makeEnhancedWithPgClient(
192203
makeWithPgClientFromClient(client),
193204
),
194-
abortSignal: undefined,
205+
abortSignal,
206+
abortPromise,
195207
},
196208
);
197209
expect(await tasks.wouldyoulike!(helpers.job.payload, helpers)).toEqual(
@@ -224,7 +236,8 @@ Array [
224236
withPgClient: makeEnhancedWithPgClient(
225237
makeWithPgClientFromClient(client),
226238
),
227-
abortSignal: undefined,
239+
abortSignal,
240+
abortPromise,
228241
},
229242
);
230243
expect(await tasks.task1!(helpers.job.payload, helpers)).toEqual("hi");
@@ -251,7 +264,8 @@ Array [
251264
withPgClient: makeEnhancedWithPgClient(
252265
makeWithPgClientFromClient(client),
253266
),
254-
abortSignal: undefined,
267+
abortSignal,
268+
abortPromise,
255269
});
256270
expect(await tasks.t1!(helpers.job.payload, helpers)).toEqual(
257271
"come with me",

__tests__/helpers.ts

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,16 @@ export async function withPgPool<T>(
101101
cb: (pool: pg.Pool) => Promise<T>,
102102
): Promise<T> {
103103
const { TEST_CONNECTION_STRING } = databaseDetails!;
104-
const pool = new pg.Pool({
104+
const pgPool = new pg.Pool({
105105
connectionString: TEST_CONNECTION_STRING,
106106
max: 100,
107107
});
108+
pgPool.on("error", () => {});
109+
pgPool.on("connect", () => {});
108110
try {
109-
return await cb(pool);
111+
return await cb(pgPool);
110112
} finally {
111-
pool.end();
113+
pgPool.end();
112114
}
113115
}
114116

@@ -298,14 +300,22 @@ export function makeMockJob(taskIdentifier: string): Job {
298300

299301
export async function makeSelectionOfJobs(
300302
utils: WorkerUtils,
301-
pgClient: pg.PoolClient,
303+
pgClient: pg.PoolClient | pg.Pool,
302304
) {
303305
const future = new Date(Date.now() + 60 * 60 * 1000);
304-
const failedJob: DbJob = await utils.addJob("job3", { a: 1, runAt: future });
305-
const regularJob1 = await utils.addJob("job3", { a: 2, runAt: future });
306-
const lockedJob: DbJob = await utils.addJob("job3", { a: 3, runAt: future });
307-
const regularJob2 = await utils.addJob("job3", { a: 4, runAt: future });
308-
const untouchedJob = await utils.addJob("job3", { a: 5, runAt: future });
306+
const failedJob: DbJob = await utils.addJob(
307+
"job3",
308+
{ a: 1 },
309+
{ runAt: future },
310+
);
311+
const regularJob1 = await utils.addJob("job3", { a: 2 }, { runAt: future });
312+
const lockedJob: DbJob = await utils.addJob(
313+
"job3",
314+
{ a: 3 },
315+
{ runAt: future },
316+
);
317+
const regularJob2 = await utils.addJob("job3", { a: 4 }, { runAt: future });
318+
const untouchedJob = await utils.addJob("job3", { a: 5 }, { runAt: future });
309319
const {
310320
rows: [lockedJobUpdate],
311321
} = await pgClient.query<DbJob>(

__tests__/main.runTaskList.test.ts

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22
import { Pool } from "pg";
33

44
import deferred, { Deferred } from "../src/deferred";
5-
import { Task, TaskList, WorkerSharedOptions } from "../src/interfaces";
5+
import { Job, Task, TaskList, WorkerSharedOptions } from "../src/interfaces";
66
import { runTaskList } from "../src/main";
77
import {
88
ESCAPED_GRAPHILE_WORKER_SCHEMA,
99
expectJobCount,
10+
getJobs,
1011
reset,
1112
sleep,
1213
sleepUntil,
@@ -100,3 +101,35 @@ test("doesn't bail on deprecated `debug` function", () =>
100101
}
101102
}
102103
}));
104+
105+
test("gracefulShutdown", async () =>
106+
withPgPool(async (pgPool) => {
107+
let jobStarted = false;
108+
const tasks: TaskList = {
109+
job1(payload, helpers) {
110+
jobStarted = true;
111+
return Promise.race([sleep(100000, true), helpers.abortPromise]);
112+
},
113+
};
114+
const workerPool = runTaskList(
115+
{ concurrency: 3, gracefulShutdownAbortTimeout: 20, useNodeTime: true },
116+
tasks,
117+
pgPool,
118+
);
119+
await addJob(pgPool);
120+
await sleepUntil(() => jobStarted);
121+
await workerPool.gracefulShutdown();
122+
await workerPool.promise;
123+
let jobs: Job[] = [];
124+
for (let attempts = 0; attempts < 10; attempts++) {
125+
jobs = await getJobs(pgPool);
126+
if (jobs[0]?.last_error) {
127+
break;
128+
} else {
129+
await sleep(25 * attempts);
130+
}
131+
}
132+
expect(jobs).toHaveLength(1);
133+
const [job] = jobs;
134+
expect(job.last_error).toBeTruthy();
135+
}));

__tests__/runner.runOnce.test.ts

Lines changed: 105 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,20 @@
11
import { Pool } from "pg";
22

33
import { makeWorkerPresetWorkerOptions } from "../src/config";
4-
import { RunnerOptions } from "../src/interfaces";
4+
import { Job, RunnerOptions, WorkerUtils } from "../src/interfaces";
5+
import { _allWorkerPools } from "../src/main";
56
import { WorkerPreset } from "../src/preset";
67
import { runOnce } from "../src/runner";
7-
import { databaseDetails, withPgPool } from "./helpers";
8+
import { makeWorkerUtils } from "../src/workerUtils";
9+
import {
10+
databaseDetails,
11+
getJobs,
12+
makeSelectionOfJobs,
13+
reset,
14+
sleep,
15+
sleepUntil,
16+
withPgPool,
17+
} from "./helpers";
818

919
delete process.env.DATABASE_URL;
1020
delete process.env.PGDATABASE;
@@ -83,10 +93,13 @@ test("at least a connectionString, a pgPool, the DATABASE_URL or PGDATABASE envv
8393
});
8494

8595
test("connectionString and a pgPool cannot provided a the same time", async () => {
96+
const pgPool = new Pool();
97+
pgPool.on("error", () => {});
98+
pgPool.on("connect", () => {});
8699
const options: RunnerOptions = {
87100
taskList: { task: () => {} },
88101
connectionString: databaseDetails!.TEST_CONNECTION_STRING,
89-
pgPool: new Pool(),
102+
pgPool,
90103
};
91104
await runOnceErrorAssertion(
92105
options,
@@ -141,3 +154,92 @@ test("providing just a pgPool is possible", async () =>
141154
expect.assertions(0);
142155
await runOnce(options);
143156
}));
157+
158+
let utils: WorkerUtils | null = null;
159+
afterEach(async () => {
160+
await utils?.release();
161+
utils = null;
162+
});
163+
164+
test("runs all available tasks and then exits", async () =>
165+
withPgPool(async (pgPool) => {
166+
const options: RunnerOptions = {
167+
taskList: { job1: () => {}, job2: () => {}, job3: () => {} },
168+
pgPool: pgPool,
169+
useNodeTime: true,
170+
};
171+
utils = await makeWorkerUtils(options);
172+
await utils.addJob("job1", { id: "PRE_SELECTION_1" });
173+
await utils.addJob("job2", { id: "PRE_SELECTION_2" });
174+
await utils.addJob("job3", { id: "PRE_SELECTION_3" });
175+
const unavailableJobs = Object.values(
176+
await makeSelectionOfJobs(utils, pgPool),
177+
);
178+
await utils.addJob("job1", { id: "POST_SELECTION_1" });
179+
await utils.addJob("job2", { id: "POST_SELECTION_2" });
180+
await utils.addJob("job3", { id: "POST_SELECTION_3" });
181+
{
182+
const jobs = await getJobs(pgPool);
183+
expect(jobs).toHaveLength(unavailableJobs.length + 6);
184+
}
185+
await runOnce(options);
186+
{
187+
const unavailableJobIds = unavailableJobs.map((j) => j.id);
188+
let jobs!: Job[];
189+
for (let attempts = 0; attempts < 10; attempts++) {
190+
jobs = await getJobs(pgPool);
191+
if (jobs.length === unavailableJobs.length) {
192+
break;
193+
} else {
194+
await sleep(attempts * 50);
195+
}
196+
}
197+
expect(jobs).toHaveLength(unavailableJobs.length);
198+
expect(
199+
jobs.filter((j) => !unavailableJobIds.includes(j.id)),
200+
).toHaveLength(0);
201+
}
202+
}));
203+
204+
test("gracefulShutdown", async () =>
205+
withPgPool(async (pgPool) => {
206+
let jobStarted = false;
207+
const options: RunnerOptions = {
208+
taskList: {
209+
job1(payload, helpers) {
210+
jobStarted = true;
211+
return Promise.race([sleep(100000, true), helpers.abortPromise]);
212+
},
213+
},
214+
pgPool,
215+
preset: {
216+
worker: {
217+
gracefulShutdownAbortTimeout: 20,
218+
useNodeTime: true,
219+
},
220+
},
221+
};
222+
await reset(pgPool, options);
223+
utils = await makeWorkerUtils(options);
224+
await utils.addJob("job1", { id: "test sleep" });
225+
expect(_allWorkerPools).toHaveLength(0);
226+
const promise = runOnce(options);
227+
await sleepUntil(() => _allWorkerPools.length === 1);
228+
expect(_allWorkerPools).toHaveLength(1);
229+
const pool = _allWorkerPools[0];
230+
await sleepUntil(() => jobStarted);
231+
await pool.gracefulShutdown();
232+
await promise;
233+
let jobs: Job[] = [];
234+
for (let attempts = 0; attempts < 10; attempts++) {
235+
jobs = await getJobs(pgPool);
236+
if (jobs[0]?.last_error) {
237+
break;
238+
} else {
239+
await sleep(25 * attempts);
240+
}
241+
}
242+
expect(jobs).toHaveLength(1);
243+
const [job] = jobs;
244+
expect(job.last_error).toBeTruthy();
245+
}));

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"prettier:check": "prettier --cache --ignore-path .eslintignore --check '**/*.{js,jsx,ts,tsx,graphql,md,json}'",
1515
"test": "yarn prepack && yarn depcheck && yarn test:setupdb && yarn test:only",
1616
"test:setupdb": "./scripts/setup_template_db.sh",
17-
"test:only": "node --experimental-vm-modules node_modules/.bin/jest",
17+
"test:only": "NO_LOG_SUCCESS=1 node --experimental-vm-modules node_modules/.bin/jest",
1818
"depcheck": "depcheck --ignores='graphile-worker,faktory-worker,@google-cloud/tasks,bullmq,jest-environment-node,@docusaurus/*,@fortawesome/*,@mdx-js/*,@types/jest,clsx,eslint_d,graphile,juice,postcss-nested,prism-react-renderer,react,react-dom,svgo,ts-node,@types/debug,tslib'",
1919
"db:dump": "./scripts/dump_db",
2020
"perfTest": "cd perfTest && node ./run.js",
@@ -84,7 +84,7 @@
8484
"eslint_d": "^13.0.0",
8585
"graphile": "^5.0.0-beta.16",
8686
"jest": "^26.0.0",
87-
"jest-time-helpers": "0.1.0",
87+
"jest-time-helpers": "0.1.1",
8888
"juice": "5.2.0",
8989
"pg-connection-string": "^2.6.2",
9090
"postcss-nested": "^6.0.1",

src/helpers.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,10 +225,12 @@ export function makeJobHelpers(
225225
{
226226
withPgClient,
227227
abortSignal,
228+
abortPromise,
228229
logger: overrideLogger,
229230
}: {
230231
withPgClient: EnhancedWithPgClient;
231-
abortSignal: AbortSignal | undefined;
232+
abortSignal: AbortSignal;
233+
abortPromise: Promise<void>;
232234
logger?: Logger;
233235
},
234236
): JobHelpers {
@@ -240,6 +242,7 @@ export function makeJobHelpers(
240242
});
241243
const helpers: JobHelpers = {
242244
abortSignal,
245+
abortPromise,
243246
job,
244247
getQueueName(queueId = job.job_queue_id) {
245248
return getQueueName(compiledSharedOptions, withPgClient, queueId);

0 commit comments

Comments
 (0)