|
| 1 | +import { Outcome, Error as OutcomeError } from "@ethossoftworks/outcome" |
| 2 | + |
| 3 | +/** |
| 4 | + * A cancellable unit of work with optional cancellation hierarchy. |
| 5 | + * |
| 6 | + * Cancellation is cooperative, meaning the user has to define pause/suspension points in the task via the [pause] or |
| 7 | + * [ensureActive] methods or by checking [isActive]. |
| 8 | + * |
| 9 | + * Cancelling a parent Job will cancel all children Jobs launched with the job defined as its parent. All children must |
| 10 | + * also cooperatively check for cancellation. |
| 11 | + * |
| 12 | + * A parent job will not wait for any children jobs unless explicitly awaited on in the provided [JobFunc]. In this |
| 13 | + * instance, if the parent completes before its child has completed, the parent will be marked as completed and the |
| 14 | + * children will be cancelled at the next pause point. |
| 15 | + * |
| 16 | + * If an exception is thrown during a JobFunc, the job will cancel itself and its children and then rethrow the |
| 17 | + * exception to be handled by the user. |
| 18 | + * |
| 19 | + * Running a job more than once will result in a [JobCancellationException]. |
| 20 | + * |
| 21 | + * Note: When adding a try/catch mechanism inside of a [JobFunc], make sure to rethrow any [JobCancellationException] |
| 22 | + * exceptions, otherwise job cancellation will not work as intended. |
| 23 | + * |
| 24 | + * Example: |
| 25 | + * ``` |
| 26 | + const job = Job(async (job) => { |
| 27 | + * // This creates a pause point. If the job is cancelled while this operation is running, |
| 28 | + * // the job will immediately return [Error] with a [JobCancellationException] as its result. |
| 29 | + * const result = await job.pause(someLongRunningTask()); |
| 30 | + * |
| 31 | + * if (result.error != null) { |
| 32 | + * return Outcome.error("Problem"); |
| 33 | + * } |
| 34 | + * return Outcome.ok("All good!"); |
| 35 | + * }); |
| 36 | + * |
| 37 | + * const jobResult = await job.run(); |
| 38 | + * ``` |
| 39 | + */ |
| 40 | +export class Job<T> implements JobHandle { |
| 41 | + private _parent: Job<any> | undefined |
| 42 | + private _children: Job<any>[] = [] |
| 43 | + private _func: JobFunc<T> |
| 44 | + private _cancelResolver: (value: Outcome<T>) => void = () => {} |
| 45 | + private _isCancelled = false |
| 46 | + private _isCompleted = false |
| 47 | + |
| 48 | + private _cancelPromise: Promise<Outcome<T>> = new Promise<Outcome<T>>((resolve) => (this._cancelResolver = resolve)) |
| 49 | + |
| 50 | + constructor(func: JobFunc<T>, options?: { parent?: Job<any> }) { |
| 51 | + this._func = func |
| 52 | + this._parent = options?.parent |
| 53 | + this._parent?._addChild(this) |
| 54 | + } |
| 55 | + |
| 56 | + /** |
| 57 | + * Returns true if the given outcome was cancelled |
| 58 | + */ |
| 59 | + static isCancelled = (outcome: Outcome<unknown>): outcome is OutcomeError<JobCancellationException> => |
| 60 | + outcome.isError() && outcome.error instanceof JobCancellationException |
| 61 | + |
| 62 | + /** |
| 63 | + * Returns true if both the parent job (if one exists) and the current job are both active. A job is active at |
| 64 | + * creation and remains active until it has completed or been cancelled. |
| 65 | + */ |
| 66 | + get isActive(): boolean { |
| 67 | + return !this._isCompleted && !this._isCancelled && (this._parent?.isActive ?? true) |
| 68 | + } |
| 69 | + |
| 70 | + /** |
| 71 | + * Returns true if the job was completed successfully |
| 72 | + */ |
| 73 | + get isCompleted(): boolean { |
| 74 | + return !this.isActive && !this.isCancelled |
| 75 | + } |
| 76 | + |
| 77 | + /** |
| 78 | + * Returns true if the job was cancelled for any reason, either by explicit invocation of cancel or because its |
| 79 | + * parent was cancelled. This does not imply that the job has fully completed because it may still be finishing |
| 80 | + * whatever it was doing and waiting for its children to complete. |
| 81 | + */ |
| 82 | + get isCancelled(): boolean { |
| 83 | + return this._isCancelled || !(this._parent?.isCancelled ?? true) |
| 84 | + } |
| 85 | + |
| 86 | + /** |
| 87 | + * Checks if the parent job and current job are active and throws [JobCancellationException] if either are inactive. |
| 88 | + * |
| 89 | + * Note: This should only be used inside of a [JobFunc]. |
| 90 | + */ |
| 91 | + ensureActive() { |
| 92 | + if (this._isCompleted) throw new JobCancellationException(JobCancellationReason.JobCompleted) |
| 93 | + if (this._isCancelled) throw new JobCancellationException(JobCancellationReason.JobCancelled) |
| 94 | + |
| 95 | + // Check parent |
| 96 | + if (this._parent === undefined) return |
| 97 | + if (!this._parent.isActive) { |
| 98 | + if (this._parent.isCompleted) throw new JobCancellationException(JobCancellationReason.ParentJobCompleted) |
| 99 | + throw new JobCancellationException(JobCancellationReason.ParentJobCancelled) |
| 100 | + } |
| 101 | + } |
| 102 | + |
| 103 | + /** |
| 104 | + * The current number of active children jobs. |
| 105 | + */ |
| 106 | + get childCount(): number { |
| 107 | + return this._children.length |
| 108 | + } |
| 109 | + |
| 110 | + /** |
| 111 | + * Creates and returns a new job with the current job as the parent. |
| 112 | + */ |
| 113 | + launch<R>(func: JobFunc<R>): Job<R> { |
| 114 | + return new Job(func, { parent: this }) |
| 115 | + } |
| 116 | + |
| 117 | + /** |
| 118 | + * Creates a new job with the current job as the parent and executes it returning its result. |
| 119 | + * |
| 120 | + * Note: This should only be used inside of a [JobFunc]. |
| 121 | + */ |
| 122 | + launchAndRun<R>(func: JobFunc<R>): Promise<Outcome<R>> { |
| 123 | + return this.launch(func).run() |
| 124 | + } |
| 125 | + |
| 126 | + /** |
| 127 | + * Execute the job and return its result. |
| 128 | + * |
| 129 | + * [run] handles all [JobCancellationException] and will return an [Error] if a cancellation occurs. |
| 130 | + */ |
| 131 | + async run(): Promise<Outcome<T>> { |
| 132 | + try { |
| 133 | + this.ensureActive() |
| 134 | + const result = this._validateResult(await Promise.race([this._func(this), this._cancelPromise])) |
| 135 | + this.ensureActive() |
| 136 | + this._isCompleted = true |
| 137 | + return result |
| 138 | + } catch (e) { |
| 139 | + if (e instanceof JobCancellationException) { |
| 140 | + return Outcome.error(e) |
| 141 | + } else { |
| 142 | + this.cancel(new JobCancellationException(JobCancellationReason.JobCancelled)) |
| 143 | + throw e |
| 144 | + } |
| 145 | + } finally { |
| 146 | + this._parent?._removeChild(this) |
| 147 | + } |
| 148 | + } |
| 149 | + |
| 150 | + /** |
| 151 | + * Executes the job and cancels the job if it takes longer than the timeout to complete/cancel. |
| 152 | + */ |
| 153 | + async runWithTimeout(milliseconds: number): Promise<Outcome<T>> { |
| 154 | + setTimeout(() => this.cancel(), milliseconds) |
| 155 | + return this.run() |
| 156 | + } |
| 157 | + |
| 158 | + private _validateResult(result: Outcome<T>): Outcome<T> { |
| 159 | + if (result.isError() && result.error instanceof JobCancellationException) throw result.error |
| 160 | + return result |
| 161 | + } |
| 162 | + |
| 163 | + /** |
| 164 | + * Await a given [func] and ensures the job is active before and after [func] execution. This effectively |
| 165 | + * creates a pause/suspend point for the job and prevents returning a result or performing an action on a result |
| 166 | + * if the job has been completed/cancelled. |
| 167 | + * |
| 168 | + * Note: This should only be used inside of a [JobFunc]. |
| 169 | + */ |
| 170 | + async pause<R>(func: Promise<R>): Promise<R> { |
| 171 | + this.ensureActive() |
| 172 | + const result = await func |
| 173 | + this.ensureActive() |
| 174 | + return result |
| 175 | + } |
| 176 | + |
| 177 | + /** |
| 178 | + * Delays a job for the specified amount of time and checks for cancellation before and after the delay. |
| 179 | + */ |
| 180 | + async delay(milliseconds: number): Promise<void> { |
| 181 | + return await this.pause(new Promise((resolve) => setTimeout(resolve, milliseconds))) |
| 182 | + } |
| 183 | + |
| 184 | + /** |
| 185 | + * Cancels the current job and all children jobs. |
| 186 | + */ |
| 187 | + cancel(reason?: JobCancellationException) { |
| 188 | + this._parent?._removeChild(this) |
| 189 | + this.cancelChildren(new JobCancellationException(JobCancellationReason.ParentJobCancelled)) |
| 190 | + |
| 191 | + if (this._isCancelled || this._isCompleted) return |
| 192 | + this._isCancelled = true |
| 193 | + this._cancelResolver(Outcome.error(reason ?? new JobCancellationException(JobCancellationReason.JobCancelled))) |
| 194 | + } |
| 195 | + |
| 196 | + /** |
| 197 | + * Cancels all children jobs without cancelling the current job. |
| 198 | + */ |
| 199 | + cancelChildren(reason?: JobCancellationException) { |
| 200 | + const childrenCopy = [...this._children] |
| 201 | + childrenCopy.forEach((job) => |
| 202 | + job.cancel(reason ?? new JobCancellationException(JobCancellationReason.JobCancelled)) |
| 203 | + ) |
| 204 | + this._children = [] |
| 205 | + } |
| 206 | + |
| 207 | + private _addChild(child: Job<any>) { |
| 208 | + if (this.isActive) this._children.push(child) |
| 209 | + } |
| 210 | + |
| 211 | + private _removeChild(child: Job<any>) { |
| 212 | + this._children.splice(this._children.indexOf(child), 1) |
| 213 | + } |
| 214 | +} |
| 215 | + |
| 216 | +/** |
| 217 | + * A helper extension of [Job] that never completes until it is cancelled. This effectively provides a long-running |
| 218 | + * context to launch children jobs in. |
| 219 | + */ |
| 220 | +export class SupervisorJob extends Job<void> { |
| 221 | + constructor(parent?: Job<any>) { |
| 222 | + super( |
| 223 | + () => new Promise<Outcome<void>>(() => {}), |
| 224 | + { parent: parent } |
| 225 | + ) |
| 226 | + } |
| 227 | +} |
| 228 | + |
| 229 | +/** |
| 230 | + * The block of work a [Job] executes. The [job] parameter is a handle of the job's instance to allow |
| 231 | + * launching of new jobs or pausing the job. |
| 232 | + */ |
| 233 | +export type JobFunc<T> = (job: JobHandle) => Promise<Outcome<T>> |
| 234 | + |
| 235 | +/** |
| 236 | + * A handle for the current job used in [JobFunc]. This interface is equivalent to [Job]'s interface with the exception |
| 237 | + * of [run] and [runWithTimeout] to prevent recursive running of the [Job] inside its [JobFunc]. |
| 238 | + */ |
| 239 | +interface JobHandle { |
| 240 | + isActive: boolean |
| 241 | + isCompleted: boolean |
| 242 | + isCancelled: boolean |
| 243 | + childCount: number |
| 244 | + ensureActive(): void |
| 245 | + launch<R>(func: JobFunc<R>): Job<R> |
| 246 | + launchAndRun<R>(func: JobFunc<R>): Promise<Outcome<R>> |
| 247 | + pause<R>(func: Promise<R>): Promise<R> |
| 248 | + delay(milliseconds: number): Promise<void> |
| 249 | + cancel(reason?: JobCancellationException): void |
| 250 | + cancelChildren(reason?: JobCancellationException): void |
| 251 | +} |
| 252 | + |
| 253 | +/** |
| 254 | + * Thrown when a job or its parent is cancelled or if a job is run more than once. |
| 255 | + */ |
| 256 | +export class JobCancellationException implements Error { |
| 257 | + name: string = "JobCancellationException" |
| 258 | + message: string = `${this.reason}` |
| 259 | + constructor(public reason: JobCancellationReason) {} |
| 260 | +} |
| 261 | + |
| 262 | +/** |
| 263 | + * The reason a job was cancelled. |
| 264 | + * |
| 265 | + * [ParentJobCancelled]: The parent job was cancelled |
| 266 | + * [ParentJobCompleted]: The parent job completed |
| 267 | + * [JobCancelled]: The current job was cancelled |
| 268 | + * [JobCompleted]: The current job was already completed. This only happens if the same job is run more than once. |
| 269 | + */ |
| 270 | +export enum JobCancellationReason { |
| 271 | + ParentJobCancelled, |
| 272 | + ParentJobCompleted, |
| 273 | + JobCancelled, |
| 274 | + JobCompleted, |
| 275 | +} |
0 commit comments