Skip to content

Commit 48ac9ea

Browse files
committed
Move promise handling outside of the gracefulShutdown/forcefulShutdown hooks
1 parent d14f3bf commit 48ac9ea

File tree

2 files changed

+41
-28
lines changed

2 files changed

+41
-28
lines changed

src/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,11 +372,11 @@ declare global {
372372

373373
poolGracefulShutdown(
374374
event: GraphileWorker.PoolGracefulShutdownEvent,
375-
): PromiseOrDirect<void>;
375+
): Promise<void>;
376376

377377
poolForcefulShutdown(
378378
event: GraphileWorker.PoolForcefulShutdownEvent,
379-
): PromiseOrDirect<void>;
379+
): Promise<void>;
380380
}
381381

382382
interface WorkerHooks {

src/main.ts

Lines changed: 39 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import {
1414
FailJobFunction,
1515
GetJobFunction,
1616
Job,
17-
PromiseOrDirect,
1817
RunOnceOptions,
1918
TaskList,
2019
WorkerEventMap,
@@ -589,7 +588,7 @@ export function _runTaskList(
589588
unregisterSignalHandlers = registerSignalHandlers(logger, events);
590589
}
591590

592-
const promise = defer();
591+
const _finPromise = defer();
593592

594593
let deactivatePromise: Promise<void> | null = null;
595594

@@ -671,21 +670,14 @@ export function _runTaskList(
671670
} catch (e) {
672671
errors.push(coerceError(e));
673672
}
673+
674674
if (errors.length === 1) {
675-
promise.reject(errors[0]);
675+
throw errors[0];
676676
} else if (errors.length > 1) {
677-
promise.reject(
678-
new AggregateError(
679-
errors,
680-
"Errors occurred whilst terminating queue",
681-
),
677+
throw new AggregateError(
678+
errors,
679+
"Errors occurred whilst terminating queue",
682680
);
683-
} else {
684-
promise.resolve();
685-
}
686-
687-
if (unregisterSignalHandlers) {
688-
unregisterSignalHandlers();
689681
}
690682
} else {
691683
try {
@@ -710,8 +702,22 @@ export function _runTaskList(
710702
// Make sure Node doesn't get upset about unhandled rejection
711703
abortPromise.then(null, () => /* noop */ void 0);
712704

713-
let gracefulShutdownPromise: PromiseOrDirect<void>;
714-
let forcefulShutdownPromise: PromiseOrDirect<void>;
705+
let gracefulShutdownPromise: Promise<void> | null = null;
706+
let forcefulShutdownPromise: Promise<void> | null = null;
707+
708+
const finWithError = (e: unknown) => {
709+
const error = e != null ? coerceError(e) : null;
710+
if (error) {
711+
_finPromise.reject(error);
712+
} else {
713+
_finPromise.resolve();
714+
}
715+
716+
if (unregisterSignalHandlers) {
717+
unregisterSignalHandlers();
718+
}
719+
};
720+
const fin = () => finWithError(null);
715721

716722
// This is a representation of us that can be interacted with externally
717723
const workerPool: WorkerPool = {
@@ -752,13 +758,13 @@ export function _runTaskList(
752758
logger.error(
753759
`gracefulShutdown called when forcefulShutdown is already in progress`,
754760
);
755-
return forcefulShutdownPromise;
761+
return forcefulShutdownPromise!;
756762
}
757763
if (workerPool._shuttingDown) {
758764
logger.error(
759765
`gracefulShutdown called when gracefulShutdown is already in progress`,
760766
);
761-
return gracefulShutdownPromise;
767+
return gracefulShutdownPromise!;
762768
}
763769

764770
workerPool._shuttingDown = true;
@@ -876,7 +882,7 @@ export function _runTaskList(
876882
// NOTE: we now rely on forcefulShutdown to handle terminate()
877883
if (this._forcefulShuttingDown) {
878884
// Skip the warning about double shutdown
879-
return forcefulShutdownPromise;
885+
return forcefulShutdownPromise!;
880886
} else {
881887
return this.forcefulShutdown(message);
882888
}
@@ -887,6 +893,8 @@ export function _runTaskList(
887893
},
888894
);
889895

896+
gracefulShutdownPromise.then(fin, finWithError);
897+
890898
const abortTimer = setTimeout(() => {
891899
abortController.abort();
892900
}, gracefulShutdownAbortTimeout);
@@ -903,7 +911,7 @@ export function _runTaskList(
903911
logger.error(
904912
`forcefulShutdown called when forcefulShutdown is already in progress`,
905913
);
906-
return forcefulShutdownPromise;
914+
return forcefulShutdownPromise!;
907915
}
908916

909917
workerPool._forcefulShuttingDown = true;
@@ -1000,29 +1008,34 @@ export function _runTaskList(
10001008
{ error: e },
10011009
);
10021010
if (!terminated) {
1011+
// Guaranteed to throw
10031012
await terminate(error);
10041013
}
1005-
throw e;
1014+
throw error;
10061015
}
10071016
if (!terminated) {
1017+
// Guaranteed to throw
10081018
await terminate(new Error("Forceful shutdown"));
10091019
}
10101020
},
10111021
);
10121022

1023+
// This should never call fin() since forceful shutdown always errors
1024+
forcefulShutdownPromise.then(fin, finWithError);
1025+
10131026
return forcefulShutdownPromise;
10141027
},
10151028

1016-
promise,
1029+
promise: _finPromise,
10171030

10181031
then(onfulfilled, onrejected) {
1019-
return promise.then(onfulfilled, onrejected);
1032+
return _finPromise.then(onfulfilled, onrejected);
10201033
},
10211034
catch(onrejected) {
1022-
return promise.catch(onrejected);
1035+
return _finPromise.catch(onrejected);
10231036
},
10241037
finally(onfinally) {
1025-
return promise.finally(onfinally);
1038+
return _finPromise.finally(onfinally);
10261039
},
10271040
_start: autostart
10281041
? null
@@ -1033,7 +1046,7 @@ export function _runTaskList(
10331046
},
10341047
};
10351048

1036-
promise.finally(() => {
1049+
_finPromise.finally(() => {
10371050
events.emit("pool:release", { pool: workerPool, workerPool });
10381051
});
10391052

0 commit comments

Comments
 (0)