Skip to content

Commit d79d204

Browse files
committed
The _moment_ that deactivate is called, all future getJob calls should immediately return undefined
1 parent 6f392f3 commit d79d204

File tree

1 file changed

+36
-27
lines changed

1 file changed

+36
-27
lines changed

src/main.ts

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import * as assert from "assert";
12
import { randomBytes } from "crypto";
23
import { EventEmitter } from "events";
34
import { Notification, Pool, PoolClient } from "pg";
@@ -594,34 +595,34 @@ export function _runTaskList(
594595

595596
function deactivate() {
596597
if (!deactivatePromise) {
598+
assert.equal(workerPool._active, true);
599+
workerPool._active = false;
600+
597601
deactivatePromise = (async () => {
598-
if (workerPool._active) {
599-
workerPool._active = false;
600-
const errors: Error[] = [];
601-
try {
602-
await localQueue?.release();
603-
} catch (rawE) {
604-
const e = coerceError(rawE);
605-
errors.push(e);
606-
// Log but continue regardless
607-
logger.error(`Releasing local queue failed: ${e}`, { error: rawE });
608-
}
609-
try {
610-
// Note: this runs regardless of success of the above
611-
await onDeactivate?.();
612-
} catch (rawE) {
613-
const e = coerceError(rawE);
614-
errors.push(e);
615-
// Log but continue regardless
616-
logger.error(`onDeactivate raised an error: ${e}`, { error: rawE });
617-
}
602+
const errors: Error[] = [];
603+
try {
604+
await localQueue?.release();
605+
} catch (rawE) {
606+
const e = coerceError(rawE);
607+
errors.push(e);
608+
// Log but continue regardless
609+
logger.error(`Releasing local queue failed: ${e}`, { error: rawE });
610+
}
611+
try {
612+
// Note: this runs regardless of success of the above
613+
await onDeactivate?.();
614+
} catch (rawE) {
615+
const e = coerceError(rawE);
616+
errors.push(e);
617+
// Log but continue regardless
618+
logger.error(`onDeactivate raised an error: ${e}`, { error: rawE });
619+
}
618620

619-
if (errors.length > 0) {
620-
throw new AggregateError(
621-
errors,
622-
"Errors occurred whilst deactivating queue",
623-
);
624-
}
621+
if (errors.length > 0) {
622+
throw new AggregateError(
623+
errors,
624+
"Errors occurred whilst deactivating queue",
625+
);
625626
}
626627
})();
627628
}
@@ -1087,8 +1088,16 @@ export function _runTaskList(
10871088
)
10881089
: null;
10891090
const getJob: GetJobFunction = localQueue
1090-
? localQueue.getJob // Already bound
1091+
? async (workerId, flagsToSkip) => {
1092+
if (!workerPool._active) {
1093+
return undefined;
1094+
}
1095+
return localQueue.getJob(workerId, flagsToSkip);
1096+
}
10911097
: async (_workerId, flagsToSkip) => {
1098+
if (!workerPool._active) {
1099+
return undefined;
1100+
}
10921101
const jobs = await baseGetJob(
10931102
compiledSharedOptions,
10941103
withPgClient,

0 commit comments

Comments
 (0)