Skip to content

Commit c355616

Browse files
authored
Expose addJobs function via library interface (#509)
2 parents 93bf81b + 860fb67 commit c355616

File tree

15 files changed

+487
-22
lines changed

15 files changed

+487
-22
lines changed

RELEASE_NOTES.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ Read more:
2525
than the `workerId`. Be sure to upgrade
2626
[Worker Pro](https://worker.graphile.org/docs/pro) at the same time if you're
2727
using it!
28+
- New `addJobs()` JS method to enable efficiently adding a batch of jobs via the
29+
JS API
30+
- DEPRECATION: `quickAddJob` has been renamed to `addJobAdhoc` to make it
31+
clearer that it's for use in one-off locations (some felt the "quick" referred
32+
to the speed it executed, rather than the amount of effort required from the
33+
programmer)
2834
- Fixes bug where CLI defaults override `graphile.config.js` settings (by
2935
removing CLI defaults)
3036
- Fix bug where executable tasks had their stdout/stderr ignored; this is now

__tests__/workerUtils.addJob.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import {
2+
addJobAdhoc,
23
makeWorkerUtils,
3-
quickAddJob,
44
runTaskListOnce,
55
Task,
66
WorkerSharedOptions,
@@ -140,7 +140,7 @@ test("runs a job added through the addJob shortcut function", () =>
140140
await reset(pgClient, options);
141141

142142
// Schedule a job
143-
await quickAddJob({ connectionString: TEST_CONNECTION_STRING }, "job3", {
143+
await addJobAdhoc({ connectionString: TEST_CONNECTION_STRING }, "job3", {
144144
a: 1,
145145
});
146146

__tests__/workerUtils.addJobs.test.ts

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
import type { Job } from "../src/index";
2+
import {
3+
addJobAdhoc,
4+
makeWorkerUtils,
5+
runTaskListOnce,
6+
Task,
7+
WorkerSharedOptions,
8+
WorkerUtils,
9+
} from "../src/index";
10+
import { getJobs, HOUR, reset, setupFakeTimers, withPgClient } from "./helpers";
11+
12+
const { setTime } = setupFakeTimers();
13+
const REFERENCE_TIMESTAMP = 1609459200000; /* 1st January 2021, 00:00:00 UTC */
14+
15+
// NOTE: many of these tests are copied from the `addJob` test file.
16+
17+
const options: WorkerSharedOptions = {};
18+
19+
let utils: WorkerUtils | null = null;
20+
afterEach(async () => {
21+
await utils?.release();
22+
utils = null;
23+
});
24+
25+
test("runs a job added through the worker utils", () =>
26+
withPgClient(async (pgClient, { TEST_CONNECTION_STRING }) => {
27+
await reset(pgClient, options);
28+
29+
// Schedule a job
30+
utils = await makeWorkerUtils({
31+
connectionString: TEST_CONNECTION_STRING,
32+
});
33+
await utils.addJobs([{ identifier: "job3", payload: { a: 1 } }]);
34+
await utils.release();
35+
utils = null;
36+
37+
// Assert that it has an entry in jobs / job_queues
38+
const jobs = await getJobs(pgClient);
39+
expect(jobs).toHaveLength(1);
40+
41+
const task: Task = jest.fn();
42+
const taskList = { task };
43+
await runTaskListOnce(options, taskList, pgClient);
44+
}));
45+
46+
test("supports the jobKey API", () =>
47+
withPgClient(async (pgClient, { TEST_CONNECTION_STRING }) => {
48+
await reset(pgClient, options);
49+
50+
// Schedule a job
51+
utils = await makeWorkerUtils({
52+
connectionString: TEST_CONNECTION_STRING,
53+
});
54+
await utils.addJobs([
55+
{ identifier: "job3", payload: { a: 1 }, jobKey: "UNIQUE" },
56+
]);
57+
await utils.addJobs([
58+
{ identifier: "job3", payload: { a: 2 }, jobKey: "UNIQUE" },
59+
]);
60+
await utils.addJobs([
61+
{ identifier: "job3", payload: { a: 3 }, jobKey: "UNIQUE" },
62+
]);
63+
await utils.addJobs([
64+
{ identifier: "job3", payload: { a: 4 }, jobKey: "UNIQUE" },
65+
]);
66+
await utils.release();
67+
utils = null;
68+
69+
// Assert that it has an entry in jobs / job_queues
70+
const jobs = await getJobs(pgClient);
71+
expect(jobs).toHaveLength(1);
72+
expect((jobs[0].payload as any).a).toBe(4);
73+
expect(jobs[0].revision).toBe(3);
74+
75+
const task: Task = jest.fn();
76+
const taskList = { task };
77+
await runTaskListOnce(options, taskList, pgClient);
78+
}));
79+
80+
test("supports the jobKey API with jobKeyMode", () =>
81+
withPgClient(async (pgClient, { TEST_CONNECTION_STRING }) => {
82+
await reset(pgClient, options);
83+
84+
// Schedule a job
85+
utils = await makeWorkerUtils({
86+
connectionString: TEST_CONNECTION_STRING,
87+
});
88+
const runAt1 = new Date("2200-01-01T00:00:00Z");
89+
const runAt2 = new Date("2201-01-01T00:00:00Z");
90+
// can't use unsafe dedupe in batch mode
91+
// const runAt3 = new Date("2202-01-01T00:00:00Z");
92+
const runAt4 = new Date("2203-01-01T00:00:00Z");
93+
let job: Job;
94+
95+
// Job first added in replace mode:
96+
[job] = await utils.addJobs([
97+
{
98+
identifier: "job3",
99+
payload: { a: 1 },
100+
jobKey: "UNIQUE",
101+
runAt: runAt1,
102+
},
103+
]);
104+
expect(job.revision).toBe(0);
105+
expect(job.payload).toEqual({ a: 1 });
106+
expect(job.run_at.toISOString()).toBe(runAt1.toISOString());
107+
108+
// Now updated, but preserve run_at
109+
[job] = await utils.addJobs(
110+
[
111+
{
112+
identifier: "job3",
113+
payload: { a: 2 },
114+
jobKey: "UNIQUE",
115+
runAt: runAt2,
116+
},
117+
],
118+
true,
119+
);
120+
expect(job.revision).toBe(1);
121+
expect(job.payload).toEqual({ a: 2 });
122+
expect(job.run_at.toISOString()).toBe(runAt1.toISOString());
123+
124+
// No unsafe dedupe here, batch mode doesn't support it
125+
126+
// Replace the job one final time
127+
[job] = await utils.addJobs([
128+
{
129+
identifier: "job3",
130+
payload: { a: 4 },
131+
jobKey: "UNIQUE",
132+
runAt: runAt4,
133+
},
134+
]);
135+
expect(job.revision).toBe(2);
136+
expect(job.payload).toEqual({ a: 4 });
137+
expect(job.run_at.toISOString()).toBe(runAt4.toISOString());
138+
139+
await utils.release();
140+
utils = null;
141+
142+
// Assert that it has an entry in jobs / job_queues
143+
const jobs = await getJobs(pgClient);
144+
expect(jobs).toHaveLength(1);
145+
expect(jobs[0].revision).toBe(2);
146+
expect((jobs[0].payload as any).a).toBe(4);
147+
expect(jobs[0].run_at.toISOString()).toBe(runAt4.toISOString());
148+
149+
const task: Task = jest.fn();
150+
const taskList = { task };
151+
await runTaskListOnce(options, taskList, pgClient);
152+
}));
153+
154+
test("runs a job added through the addJob shortcut function", () =>
155+
withPgClient(async (pgClient, { TEST_CONNECTION_STRING }) => {
156+
await reset(pgClient, options);
157+
// Schedule a job
158+
await addJobAdhoc({ connectionString: TEST_CONNECTION_STRING }, "job3", {
159+
a: 1,
160+
});
161+
162+
// Assert that it has an entry in jobs / job_queues
163+
const jobs = await getJobs(pgClient);
164+
expect(jobs).toHaveLength(1);
165+
166+
const task: Task = jest.fn();
167+
const taskList = { task };
168+
await runTaskListOnce(options, taskList, pgClient);
169+
}));
170+
171+
test("adding job respects useNodeTime", () =>
172+
withPgClient(async (pgClient, { TEST_CONNECTION_STRING }) => {
173+
await setTime(REFERENCE_TIMESTAMP);
174+
await reset(pgClient, options);
175+
176+
// Schedule a job
177+
utils = await makeWorkerUtils({
178+
connectionString: TEST_CONNECTION_STRING,
179+
useNodeTime: true,
180+
});
181+
const timeOfAddJob = REFERENCE_TIMESTAMP + 1 * HOUR;
182+
await setTime(timeOfAddJob);
183+
await utils.addJobs([{ identifier: "job3", payload: { a: 1 } }]);
184+
await utils.release();
185+
utils = null;
186+
187+
// Assert that it has an entry in jobs / job_queues
188+
const jobs = await getJobs(pgClient);
189+
expect(jobs).toHaveLength(1);
190+
// Assert the run_at is within a couple of seconds of timeOfAddJob, even
191+
// though PostgreSQL has a NOW() that's many months later.
192+
const runAt = jobs[0].run_at;
193+
expect(+runAt).toBeGreaterThan(timeOfAddJob - 2000);
194+
expect(+runAt).toBeLessThan(timeOfAddJob + 2000);
195+
}));
196+
197+
test("adding lots of jobs works", () =>
198+
withPgClient(async (pgClient, { TEST_CONNECTION_STRING }) => {
199+
await reset(pgClient, options);
200+
201+
// Schedule a job
202+
utils = await makeWorkerUtils({
203+
connectionString: TEST_CONNECTION_STRING,
204+
});
205+
const timeOfAddJob = REFERENCE_TIMESTAMP + 1 * HOUR;
206+
await setTime(timeOfAddJob);
207+
await utils.addJobs([
208+
{ identifier: "job3", payload: { a: 1 } },
209+
{ identifier: "job3", payload: { a: 2 } },
210+
{ identifier: "job3", payload: { a: 3 } },
211+
{ identifier: "job3", payload: { a: 4 } },
212+
{ identifier: "job3", payload: { a: 5 } },
213+
{ identifier: "job3", payload: { a: 6 } },
214+
]);
215+
await utils.release();
216+
utils = null;
217+
218+
// Assert that it has an entry in jobs / job_queues
219+
const jobs = await getJobs(pgClient);
220+
expect(jobs).toHaveLength(6);
221+
const a = jobs.map((j) => j.payload.a).sort();
222+
expect(a).toEqual([1, 2, 3, 4, 5, 6]);
223+
}));

examples/readme/events.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
const { run, quickAddJob } = require(/* "graphile-worker" */ "../..");
1+
const { run, addJobAdhoc } = require(/* "graphile-worker" */ "../..");
22

33
async function main() {
44
// Run a worker to execute jobs:
@@ -24,7 +24,7 @@ async function main() {
2424
});
2525

2626
// Or add a job to be executed:
27-
await quickAddJob(
27+
await addJobAdhoc(
2828
// makeWorkerUtils options
2929
{ connectionString: "postgres:///my_db" },
3030

src/helpers.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ import { Pool, PoolClient } from "pg";
33
import defer, { Deferred } from "./deferred";
44
import {
55
AddJobFunction,
6+
AddJobsFunction,
7+
DbJob,
8+
DbJobSpec,
69
EnhancedWithPgClient,
710
Job,
811
JobHelpers,
@@ -65,6 +68,55 @@ export function makeAddJob(
6568
};
6669
}
6770

71+
export function makeAddJobs(
72+
compiledSharedOptions: CompiledSharedOptions,
73+
withPgClient: WithPgClient,
74+
): AddJobsFunction {
75+
const {
76+
escapedWorkerSchema,
77+
resolvedPreset: {
78+
worker: { useNodeTime },
79+
},
80+
} = compiledSharedOptions;
81+
return (jobSpecs, jobKeyPreserveRunAt) =>
82+
withPgClient(async (pgClient) => {
83+
const NOW = useNodeTime ? new Date().toISOString() : undefined;
84+
const dbSpecs = jobSpecs.map(
85+
(spec): DbJobSpec => ({
86+
identifier: spec.identifier,
87+
payload: spec.payload,
88+
queue_name: spec.queueName,
89+
run_at: spec.runAt?.toISOString() ?? NOW,
90+
max_attempts: spec.maxAttempts,
91+
job_key: spec.jobKey,
92+
priority: spec.priority,
93+
flags: spec.flags,
94+
}),
95+
);
96+
const { rows: dbJobs } = await pgClient.query<DbJob>(
97+
`\
98+
select *
99+
from ${escapedWorkerSchema}.add_jobs(
100+
array(
101+
select json_populate_recordset(null::${escapedWorkerSchema}.job_spec, $1::json)
102+
),
103+
$2::boolean
104+
);`,
105+
[JSON.stringify(dbSpecs), jobKeyPreserveRunAt],
106+
);
107+
const jobs: Job[] = [];
108+
for (let i = 0, l = jobSpecs.length; i < l; i++) {
109+
const dbJob = dbJobs[i];
110+
const jobSpec = jobSpecs[i];
111+
jobs.push({
112+
...dbJob,
113+
task_identifier: jobSpec.identifier,
114+
});
115+
}
116+
return jobs;
117+
});
118+
}
119+
68120
const $$cache = Symbol("queueNameById");
69121
const $$nextBatch = Symbol("pendingQueueIds");
70122
function getQueueName(
@@ -197,6 +249,7 @@ export function makeJobHelpers(
197249
query: (queryText, values) =>
198250
withPgClient((pgClient) => pgClient.query(queryText, values)),
199251
addJob: makeAddJob(compiledSharedOptions, withPgClient),
252+
addJobs: makeAddJobs(compiledSharedOptions, withPgClient),
200253

201254
// TODO: add an API for giving workers more helpers
202255
};

src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export {
2626
export { runTaskList, runTaskListOnce } from "./main";
2727
export { WorkerPreset } from "./preset";
2828
export { run, runMigrations, runOnce } from "./runner";
29-
export { makeWorkerUtils, quickAddJob } from "./workerUtils";
29+
export { addJobAdhoc, makeWorkerUtils, quickAddJob } from "./workerUtils";
3030

3131
export { getTasks };
3232
export { getCronItems };

0 commit comments

Comments
 (0)