Skip to content

Commit c4e18b3

Browse files
authored
Merge pull request #1823 from FaberVitale/refactor/action-m-use-abort-controller
2 parents 4be26d6 + 11bcbaa commit c4e18b3

File tree

12 files changed

+759
-1054
lines changed

12 files changed

+759
-1054
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
export class TaskAbortError implements Error {
2+
name: string
3+
message: string
4+
constructor(public reason?: string) {
5+
this.name = 'TaskAbortError'
6+
this.message = `task cancelled` + (reason != null ? `: "${reason}"` : '')
7+
}
8+
}

packages/action-listener-middleware/src/index.ts

Lines changed: 149 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import type {
44
AnyAction,
55
Action,
66
ThunkDispatch,
7+
MiddlewareAPI,
78
} from '@reduxjs/toolkit'
89
import { createAction, nanoid } from '@reduxjs/toolkit'
910

@@ -28,17 +29,19 @@ import type {
2829
WithMiddlewareType,
2930
TakePattern,
3031
ListenerErrorInfo,
32+
ForkedTaskExecutor,
33+
ForkedTask,
3134
} from './types'
32-
35+
import { assertFunction } from './utils'
36+
import { TaskAbortError } from './exceptions'
3337
import {
34-
Job,
35-
SupervisorJob,
36-
JobHandle,
37-
JobCancellationReason,
38-
JobCancellationException,
39-
} from './job'
40-
import { Outcome } from './outcome'
41-
38+
runTask,
39+
promisifyAbortSignal,
40+
validateActive,
41+
createPause,
42+
createDelay,
43+
} from './task'
44+
export { TaskAbortError } from './exceptions'
4245
export type {
4346
ActionListener,
4447
ActionListenerMiddleware,
@@ -51,87 +54,101 @@ export type {
5154
TypedAddListener,
5255
TypedAddListenerAction,
5356
Unsubscribe,
57+
ForkedTaskExecutor,
58+
ForkedTask,
59+
ForkedTaskAPI,
60+
AsyncTaskExecutor,
61+
SyncTaskExecutor,
62+
TaskCancelled,
63+
TaskRejected,
64+
TaskResolved,
65+
TaskResult,
5466
} from './types'
5567

56-
function assertFunction(
57-
func: unknown,
58-
expected: string
59-
): asserts func is (...args: unknown[]) => unknown {
60-
if (typeof func !== 'function') {
61-
throw new TypeError(`${expected} is not a function`)
62-
}
63-
}
64-
6568
const defaultWhen: MiddlewarePhase = 'afterReducer'
6669
const actualMiddlewarePhases = ['beforeReducer', 'afterReducer'] as const
6770

68-
function createTakePattern<S>(
71+
const createFork = (parentAbortSignal: AbortSignal) => {
72+
return <T>(taskExecutor: ForkedTaskExecutor<T>): ForkedTask<T> => {
73+
assertFunction(taskExecutor, 'taskExecutor')
74+
const childAbortController = new AbortController()
75+
const cancel = () => {
76+
childAbortController.abort()
77+
}
78+
79+
const result = runTask<T>(async (): Promise<T> => {
80+
validateActive(parentAbortSignal)
81+
validateActive(childAbortController.signal)
82+
const result = (await taskExecutor({
83+
pause: createPause(childAbortController.signal),
84+
delay: createDelay(childAbortController.signal),
85+
signal: childAbortController.signal,
86+
})) as T
87+
validateActive(parentAbortSignal)
88+
validateActive(childAbortController.signal)
89+
return result
90+
}, cancel)
91+
92+
return {
93+
result,
94+
cancel,
95+
}
96+
}
97+
}
98+
99+
const createTakePattern = <S>(
69100
addListener: AddListenerOverloads<Unsubscribe, S, Dispatch<AnyAction>>,
70-
parentJob: Job<any>
71-
): TakePattern<S> {
101+
signal: AbortSignal
102+
): TakePattern<S> => {
72103
/**
73104
* A function that takes an ActionListenerPredicate and an optional timeout,
74105
* and resolves when either the predicate returns `true` based on an action
75106
* state combination or when the timeout expires.
76107
* If the parent listener is canceled while waiting, this will throw a
77-
* JobCancellationException.
108+
* TaskAbortError.
78109
*/
79-
async function take<P extends AnyActionListenerPredicate<S>>(
110+
const take = async <P extends AnyActionListenerPredicate<S>>(
80111
predicate: P,
81112
timeout: number | undefined
82-
) {
113+
) => {
114+
validateActive(signal)
115+
83116
// Placeholder unsubscribe function until the listener is added
84117
let unsubscribe: Unsubscribe = () => {}
85118

86-
// We'll add an additional nested Job representing this function.
87-
// TODO This is really a duplicate of the other job inside the middleware.
88-
// This behavior requires some additional nesting:
89-
// We're going to create a `Promise` representing the result of the listener,
90-
// but then wrap that in an `Outcome` for consistent error handling.
91-
let job: Job<[AnyAction, S, S]> = parentJob.launch(async (job) =>
92-
Outcome.wrap(
93-
new Promise<[AnyAction, S, S]>((resolve) => {
94-
// Inside the Promise, we synchronously add the listener.
95-
unsubscribe = addListener({
96-
predicate: predicate as any,
97-
listener: (action, listenerApi): void => {
98-
// One-shot listener that cleans up as soon as the predicate passes
99-
listenerApi.unsubscribe()
100-
// Resolve the promise with the same arguments the predicate saw
101-
resolve([
102-
action,
103-
listenerApi.getState(),
104-
listenerApi.getOriginalState(),
105-
])
106-
},
107-
parentJob,
108-
})
109-
})
119+
const tuplePromise = new Promise<[AnyAction, S, S]>((resolve) => {
120+
// Inside the Promise, we synchronously add the listener.
121+
unsubscribe = addListener({
122+
predicate: predicate as any,
123+
listener: (action, listenerApi): void => {
124+
// One-shot listener that cleans up as soon as the predicate passes
125+
listenerApi.unsubscribe()
126+
// Resolve the promise with the same arguments the predicate saw
127+
resolve([
128+
action,
129+
listenerApi.getState(),
130+
listenerApi.getOriginalState(),
131+
])
132+
},
133+
})
134+
})
135+
136+
const promises: (Promise<null> | Promise<[AnyAction, S, S]>)[] = [
137+
promisifyAbortSignal(signal),
138+
tuplePromise,
139+
]
140+
141+
if (timeout != null) {
142+
promises.push(
143+
new Promise<null>((resolve) => setTimeout(resolve, timeout, null))
110144
)
111-
)
112-
113-
let result: Outcome<[AnyAction, S, S]>
145+
}
114146

115147
try {
116-
// Run the job and use the timeout if given
117-
result = await (timeout !== undefined
118-
? job.runWithTimeout(timeout)
119-
: job.run())
120-
121-
if (result.isOk()) {
122-
// Resolve the actual `take` promise with the action+states
123-
return result.value
124-
} else {
125-
if (
126-
result.error instanceof JobCancellationException &&
127-
result.error.reason === JobCancellationReason.JobCancelled
128-
) {
129-
// The `take` job itself was canceled due to timeout.
130-
return null
131-
}
132-
// The parent was canceled - reject this promise with that error
133-
throw result.error
134-
}
148+
const output = await Promise.race(promises)
149+
150+
validateActive(signal)
151+
return output
135152
} finally {
136153
// Always clean up the listener
137154
unsubscribe()
@@ -171,10 +188,10 @@ export const createListenerEntry: TypedCreateListenerEntry<unknown> = (
171188
listener: options.listener,
172189
type,
173190
predicate,
191+
pendingSet: new Set<AbortController>(),
174192
unsubscribe: () => {
175193
throw new Error('Unsubscribe not initialized')
176194
},
177-
parentJob: new SupervisorJob(),
178195
}
179196

180197
return entry
@@ -281,9 +298,9 @@ export function createActionListenerMiddleware<
281298
return entry.unsubscribe
282299
}
283300

284-
function findListenerEntry(
301+
const findListenerEntry = (
285302
comparator: (entry: ListenerEntry) => boolean
286-
): ListenerEntry | undefined {
303+
): ListenerEntry | undefined => {
287304
for (const entry of listenerMap.values()) {
288305
if (comparator(entry)) {
289306
return entry
@@ -334,6 +351,64 @@ export function createActionListenerMiddleware<
334351
return true
335352
}
336353

354+
const notifyListener = async (
355+
entry: ListenerEntry<unknown, Dispatch<AnyAction>>,
356+
action: AnyAction,
357+
api: MiddlewareAPI,
358+
getOriginalState: () => S,
359+
currentPhase: MiddlewarePhase
360+
) => {
361+
const internalTaskController = new AbortController()
362+
const take = createTakePattern(addListener, internalTaskController.signal)
363+
const condition: ConditionFunction<S> = (predicate, timeout) => {
364+
return take(predicate, timeout).then(Boolean)
365+
}
366+
const delay = createDelay(internalTaskController.signal)
367+
const fork = createFork(internalTaskController.signal)
368+
const pause: (val: Promise<any>) => Promise<any> = createPause(
369+
internalTaskController.signal
370+
)
371+
try {
372+
entry.pendingSet.add(internalTaskController)
373+
await Promise.resolve(
374+
entry.listener(action, {
375+
...api,
376+
getOriginalState,
377+
condition,
378+
take,
379+
delay,
380+
pause,
381+
currentPhase,
382+
extra,
383+
signal: internalTaskController.signal,
384+
fork,
385+
unsubscribe: entry.unsubscribe,
386+
subscribe: () => {
387+
listenerMap.set(entry.id, entry)
388+
},
389+
cancelPrevious: () => {
390+
entry.pendingSet.forEach((controller, _, set) => {
391+
if (controller !== internalTaskController) {
392+
controller.abort()
393+
set.delete(controller)
394+
}
395+
})
396+
},
397+
})
398+
)
399+
} catch (listenerError) {
400+
if (!(listenerError instanceof TaskAbortError)) {
401+
safelyNotifyError(onError, listenerError, {
402+
raisedBy: 'listener',
403+
phase: currentPhase,
404+
})
405+
}
406+
} finally {
407+
internalTaskController.abort() // Notify that the task has completed
408+
entry.pendingSet.delete(internalTaskController)
409+
}
410+
}
411+
337412
const middleware: Middleware<
338413
{
339414
(action: Action<'actionListenerMiddleware/add'>): Unsubscribe
@@ -390,47 +465,7 @@ export function createActionListenerMiddleware<
390465
continue
391466
}
392467

393-
entry.parentJob.launchAndRun(async (jobHandle) => {
394-
const take = createTakePattern(addListener, jobHandle as Job<any>)
395-
const condition: ConditionFunction<S> = (predicate, timeout) => {
396-
return take(predicate, timeout).then(Boolean)
397-
}
398-
399-
const result = await Outcome.try(async () =>
400-
entry.listener(action, {
401-
...api,
402-
getOriginalState,
403-
condition,
404-
take,
405-
currentPhase,
406-
extra,
407-
unsubscribe: entry.unsubscribe,
408-
subscribe: () => {
409-
listenerMap.set(entry.id, entry)
410-
},
411-
job: jobHandle,
412-
cancelPrevious: () => {
413-
entry.parentJob.cancelChildren(
414-
new JobCancellationException(
415-
JobCancellationReason.JobCancelled
416-
),
417-
[jobHandle]
418-
)
419-
},
420-
})
421-
)
422-
if (
423-
result.isError() &&
424-
!(result.error instanceof JobCancellationException)
425-
) {
426-
safelyNotifyError(onError, result.error, {
427-
raisedBy: 'listener',
428-
phase: currentPhase,
429-
})
430-
}
431-
432-
return Outcome.ok(1)
433-
})
468+
notifyListener(entry, action, api, getOriginalState, currentPhase)
434469
}
435470
if (currentPhase === 'beforeReducer') {
436471
result = next(action)

0 commit comments

Comments
 (0)