Skip to content

Commit bfb7c36

Browse files
committed
Ensure that LocalQueue exits with the correct status (e.g. rejects if returning jobs fails)
1 parent 03d1dee commit bfb7c36

File tree

1 file changed

+82
-32
lines changed

1 file changed

+82
-32
lines changed

src/localQueue.ts

Lines changed: 82 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
import { MINUTE, SECOND } from "./cronConstants";
1111
import defer, { Deferred } from "./deferred";
1212
import { GetJobFunction, Job, TaskList, WorkerPool } from "./interfaces";
13+
import { coerceError } from "./lib";
1314
import { getJob as baseGetJob } from "./sql/getJob";
1415
import { returnJobs } from "./sql/returnJobs";
1516

@@ -108,7 +109,9 @@ export class LocalQueue {
108109
// when the queue is pulsed during a fetch.
109110
fetchAgain = false;
110111
public readonly mode: LocalQueueMode = STARTING;
111-
private promise = defer();
112+
/** The promise that resolves/rejects when the queue is disposed of */
113+
private _finPromise = defer();
114+
private errors: Error[] = [];
112115
/** A count of the number of "background" processes such as fetching or returning jobs */
113116
private backgroundCount = 0;
114117

@@ -161,17 +164,47 @@ export class LocalQueue {
161164
});
162165
}
163166

167+
private fin() {
168+
assert.equal(this.mode, "RELEASED");
169+
assert.equal(this.backgroundCount, 0);
170+
if (this.errors.length === 1) {
171+
this._finPromise.reject(this.errors[0]);
172+
} else if (this.errors.length > 1) {
173+
this._finPromise.reject(new AggregateError(this.errors));
174+
} else {
175+
this._finPromise.resolve();
176+
}
177+
}
178+
164179
private decreaseBackgroundCount = () => {
165180
this.backgroundCount--;
166181
if (this.mode === "RELEASED" && this.backgroundCount === 0) {
167-
this.promise.resolve();
182+
this.fin();
183+
}
184+
};
185+
186+
private decreaseBackgroundCountWithError = (e: unknown) => {
187+
this.backgroundCount--;
188+
if (this.mode === "RELEASED") {
189+
this.errors.push(coerceError(e));
190+
if (this.backgroundCount === 0) {
191+
this.fin();
192+
}
193+
} else {
194+
this.compiledSharedOptions.logger.error(
195+
`Backgrounding should never yield errors when the queue is not RELEASED`,
196+
{ error: e },
197+
);
168198
}
169199
};
170200

171201
/**
172202
* For promises that happen in the background, but that we want to ensure are
173203
* handled before we release the queue (so that the database pool isn't
174204
* released too early).
205+
*
206+
* IMPORTANT: never raise an error from background unless mode === "RELEASED" - you
207+
* need to handle errors yourself!
175208
*/
176209
private background(promise: Promise<void>) {
177210
if (this.mode === "RELEASED" && this.backgroundCount === 0) {
@@ -180,7 +213,10 @@ export class LocalQueue {
180213
);
181214
}
182215
this.backgroundCount++;
183-
promise.then(this.decreaseBackgroundCount, this.decreaseBackgroundCount);
216+
promise.then(
217+
this.decreaseBackgroundCount,
218+
this.decreaseBackgroundCountWithError,
219+
);
184220
}
185221

186222
private setModePolling() {
@@ -265,7 +301,11 @@ export class LocalQueue {
265301
}
266302

267303
private returnJobs() {
268-
const jobsToReturn = this.jobQueue.splice(0, this.jobQueue.length);
304+
const l = this.jobQueue.length;
305+
if (l === 0) {
306+
return;
307+
}
308+
const jobsToReturn = this.jobQueue.splice(0, l);
269309
this.compiledSharedOptions.events.emit("localQueue:returnJobs", {
270310
localQueue: this,
271311
jobs: jobsToReturn,
@@ -279,16 +319,39 @@ export class LocalQueue {
279319
).then(
280320
() => {},
281321
(e) => {
282-
// TODO: handle this better!
283-
this.compiledSharedOptions.logger.error(
284-
`Failed to return jobs from local queue to database queue`,
285-
{ error: e },
286-
);
322+
if (this.mode === "RELEASED") {
323+
throw new Error(
324+
`Error occurred whilst returning jobs from local queue to database queue: ${
325+
coerceError(e).message
326+
}`,
327+
);
328+
} else {
329+
// Return the jobs to the queue; MUST NOT HAPPEN IN RELEASED MODE.
330+
this.receivedJobs(jobsToReturn);
331+
this.compiledSharedOptions.logger.error(
332+
`Failed to return jobs from local queue to database queue`,
333+
{ error: e },
334+
);
335+
}
287336
},
288337
),
289338
);
290339
}
291340

341+
private receivedJobs(jobs: Job[]) {
342+
const jobCount = jobs.length;
343+
const workerCount = Math.min(jobCount, this.workerQueue.length);
344+
const workers = this.workerQueue.splice(0, workerCount);
345+
for (let i = 0; i < jobCount; i++) {
346+
const job = jobs[i];
347+
if (i < workerCount) {
348+
workers[i].resolve(job);
349+
} else {
350+
this.jobQueue.push(job);
351+
}
352+
}
353+
}
354+
292355
private fetch = (): void => {
293356
if (this.fetchTimer) {
294357
clearTimeout(this.fetchTimer);
@@ -364,11 +427,6 @@ export class LocalQueue {
364427
jobs,
365428
});
366429

367-
assert.equal(
368-
this.jobQueue.length,
369-
0,
370-
"Should not fetch when job queue isn't empty (recheck)",
371-
);
372430
jobCount = jobs.length;
373431
fetchedMax = jobCount >= this.getJobBatchSize;
374432
refetchDelayThresholdSurpassed =
@@ -381,17 +439,7 @@ export class LocalQueue {
381439

382440
// NOTE: we don't need to handle `this.mode === RELEASED` here because
383441
// being in that mode guarantees the workerQueue is empty.
384-
385-
const workerCount = Math.min(jobCount, this.workerQueue.length);
386-
const workers = this.workerQueue.splice(0, workerCount);
387-
for (let i = 0; i < jobCount; i++) {
388-
const job = jobs[i];
389-
if (i < workerCount) {
390-
workers[i].resolve(job);
391-
} else {
392-
this.jobQueue.push(job);
393-
}
394-
}
442+
this.receivedJobs(jobs);
395443
} catch (e) {
396444
// Error happened; rely on poll interval.
397445
this.compiledSharedOptions.logger.error(
@@ -573,7 +621,7 @@ export class LocalQueue {
573621
if (this.mode !== "RELEASED") {
574622
this.setModeReleased();
575623
}
576-
return this.promise;
624+
return this._finPromise;
577625
}
578626

579627
private setModeReleased() {
@@ -594,14 +642,16 @@ export class LocalQueue {
594642
futureJobs.forEach((futureJob) => futureJob.resolve(undefined));
595643

596644
// Release next fetch call
597-
if (this.fetchTimer) {
645+
if (this.fetchTimer != null) {
646+
// No need to return jobs in POLLING mode
598647
clearTimeout(this.fetchTimer);
599648
this.fetchTimer = null;
600-
this.promise.resolve();
601649
} else {
602-
// Rely on checking mode at end of fetch
650+
// There's a fetch in progress, so backgroundCount will not be 0, and
651+
// fetch handles calling returnJobs if it completes when in RELEASED
652+
// mode.
603653
}
604-
// No need to return jobs
654+
605655
break;
606656
}
607657
case WAITING: {
@@ -615,7 +665,7 @@ export class LocalQueue {
615665
break;
616666
}
617667
case TTL_EXPIRED: {
618-
// No action necessary
668+
// No action necessary, jobs are already returned
619669
break;
620670
}
621671
case STARTING: {
@@ -633,7 +683,7 @@ export class LocalQueue {
633683
}
634684

635685
if (this.backgroundCount === 0) {
636-
this.promise.resolve();
686+
this.fin();
637687
}
638688
}
639689
}

0 commit comments

Comments
 (0)