Skip to content

Commit 11bcbaa

Browse files
committed
feat(action-listener-middleware): add fork API
- Changes * forked task executor now receive an API, currently { pause, delay, signal } * added lisnter.fork tests * re-enabled 'Long polling loop' test * removed outcome.ts it has been replaced by an ADT * micro optimazations to keep the size at 4.0K * added pause to listenerApi * encapsulated abortController in fork() output * listenerApi.delay now rejects as soon as the listner is cancelled - Build log Build "actionListenerMiddleware" to dist/esm: 1763 B: index.modern.js.gz 1575 B: index.modern.js.br Build "actionListenerMiddleware" to dist/module: 2.4 kB: index.js.gz 2.15 kB: index.js.br Build "actionListenerMiddleware" to dist/cjs: 2.4 kB: index.js.gz 2.15 kB: index.js.br - Test log PASS src/tests/listenerMiddleware.test.ts PASS src/tests/effectScenarios.test.ts PASS src/tests/fork.test.ts PASS src/tests/useCases.test.ts Test Suites: 4 passed, 4 total Tests: 56 passed, 56 total
1 parent dc2480f commit 11bcbaa

File tree

9 files changed

+621
-177
lines changed

9 files changed

+621
-177
lines changed

packages/action-listener-middleware/src/index.ts

Lines changed: 56 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,18 @@ import type {
2929
WithMiddlewareType,
3030
TakePattern,
3131
ListenerErrorInfo,
32-
TaskExecutor,
32+
ForkedTaskExecutor,
3333
ForkedTask,
3434
} from './types'
35-
35+
import { assertFunction } from './utils'
3636
import { TaskAbortError } from './exceptions'
37-
import { Outcome } from './outcome'
38-
export type { Outcome, Ok, Error } from './outcome'
37+
import {
38+
runTask,
39+
promisifyAbortSignal,
40+
validateActive,
41+
createPause,
42+
createDelay,
43+
} from './task'
3944
export { TaskAbortError } from './exceptions'
4045
export type {
4146
ActionListener,
@@ -49,86 +54,68 @@ export type {
4954
TypedAddListener,
5055
TypedAddListenerAction,
5156
Unsubscribe,
52-
TaskExecutor,
57+
ForkedTaskExecutor,
5358
ForkedTask,
59+
ForkedTaskAPI,
5460
AsyncTaskExecutor,
5561
SyncTaskExecutor,
62+
TaskCancelled,
63+
TaskRejected,
64+
TaskResolved,
65+
TaskResult,
5666
} from './types'
5767

58-
function assertFunction(
59-
func: unknown,
60-
expected: string
61-
): asserts func is (...args: unknown[]) => unknown {
62-
if (typeof func !== 'function') {
63-
throw new TypeError(`${expected} is not a function`)
64-
}
65-
}
66-
6768
const defaultWhen: MiddlewarePhase = 'afterReducer'
6869
const actualMiddlewarePhases = ['beforeReducer', 'afterReducer'] as const
6970

70-
function assertActive(signal: AbortSignal, reason?: string) {
71-
if (signal.aborted) {
72-
throw new TaskAbortError(reason)
73-
}
74-
}
75-
76-
function createDelay(signal: AbortSignal) {
77-
return async function delay(timeoutMs: number): Promise<void> {
78-
assertActive(signal)
79-
await new Promise((resolve) => setTimeout(resolve, timeoutMs))
80-
assertActive(signal)
81-
}
82-
}
83-
84-
function createFork(parentAbortSignal: AbortSignal) {
85-
return function fork<T>(childJobExecutor: TaskExecutor<T>): ForkedTask<T> {
71+
const createFork = (parentAbortSignal: AbortSignal) => {
72+
return <T>(taskExecutor: ForkedTaskExecutor<T>): ForkedTask<T> => {
73+
assertFunction(taskExecutor, 'taskExecutor')
8674
const childAbortController = new AbortController()
87-
const promise = Outcome.try(async () => {
88-
assertActive(parentAbortSignal)
89-
const result = await Promise.resolve(childJobExecutor())
90-
assertActive(parentAbortSignal)
91-
assertActive(childAbortController.signal)
75+
const cancel = () => {
76+
childAbortController.abort()
77+
}
9278

79+
const result = runTask<T>(async (): Promise<T> => {
80+
validateActive(parentAbortSignal)
81+
validateActive(childAbortController.signal)
82+
const result = (await taskExecutor({
83+
pause: createPause(childAbortController.signal),
84+
delay: createDelay(childAbortController.signal),
85+
signal: childAbortController.signal,
86+
})) as T
87+
validateActive(parentAbortSignal)
88+
validateActive(childAbortController.signal)
9389
return result
94-
})
90+
}, cancel)
91+
9592
return {
96-
promise,
97-
controller: childAbortController,
93+
result,
94+
cancel,
9895
}
9996
}
10097
}
10198

102-
function createTakePattern<S>(
99+
const createTakePattern = <S>(
103100
addListener: AddListenerOverloads<Unsubscribe, S, Dispatch<AnyAction>>,
104101
signal: AbortSignal
105-
): TakePattern<S> {
102+
): TakePattern<S> => {
106103
/**
107104
* A function that takes an ActionListenerPredicate and an optional timeout,
108105
* and resolves when either the predicate returns `true` based on an action
109106
* state combination or when the timeout expires.
110107
* If the parent listener is canceled while waiting, this will throw a
111108
* TaskAbortError.
112109
*/
113-
async function take<P extends AnyActionListenerPredicate<S>>(
110+
const take = async <P extends AnyActionListenerPredicate<S>>(
114111
predicate: P,
115112
timeout: number | undefined
116-
) {
117-
assertActive(signal)
113+
) => {
114+
validateActive(signal)
118115

119116
// Placeholder unsubscribe function until the listener is added
120117
let unsubscribe: Unsubscribe = () => {}
121118

122-
const signalPromise = new Promise<null>((_, reject) => {
123-
signal.addEventListener(
124-
'abort',
125-
() => {
126-
reject(new TaskAbortError())
127-
},
128-
{ once: true }
129-
)
130-
})
131-
132119
const tuplePromise = new Promise<[AnyAction, S, S]>((resolve) => {
133120
// Inside the Promise, we synchronously add the listener.
134121
unsubscribe = addListener({
@@ -147,7 +134,7 @@ function createTakePattern<S>(
147134
})
148135

149136
const promises: (Promise<null> | Promise<[AnyAction, S, S]>)[] = [
150-
signalPromise,
137+
promisifyAbortSignal(signal),
151138
tuplePromise,
152139
]
153140

@@ -160,7 +147,7 @@ function createTakePattern<S>(
160147
try {
161148
const output = await Promise.race(promises)
162149

163-
assertActive(signal)
150+
validateActive(signal)
164151
return output
165152
} finally {
166153
// Always clean up the listener
@@ -201,7 +188,7 @@ export const createListenerEntry: TypedCreateListenerEntry<unknown> = (
201188
listener: options.listener,
202189
type,
203190
predicate,
204-
taskAbortControllerSet: new Set<AbortController>(),
191+
pendingSet: new Set<AbortController>(),
205192
unsubscribe: () => {
206193
throw new Error('Unsubscribe not initialized')
207194
},
@@ -311,9 +298,9 @@ export function createActionListenerMiddleware<
311298
return entry.unsubscribe
312299
}
313300

314-
function findListenerEntry(
301+
const findListenerEntry = (
315302
comparator: (entry: ListenerEntry) => boolean
316-
): ListenerEntry | undefined {
303+
): ListenerEntry | undefined => {
317304
for (const entry of listenerMap.values()) {
318305
if (comparator(entry)) {
319306
return entry
@@ -364,30 +351,33 @@ export function createActionListenerMiddleware<
364351
return true
365352
}
366353

367-
async function notifyListener(
354+
const notifyListener = async (
368355
entry: ListenerEntry<unknown, Dispatch<AnyAction>>,
369356
action: AnyAction,
370357
api: MiddlewareAPI,
371358
getOriginalState: () => S,
372359
currentPhase: MiddlewarePhase
373-
) {
360+
) => {
374361
const internalTaskController = new AbortController()
375362
const take = createTakePattern(addListener, internalTaskController.signal)
376363
const condition: ConditionFunction<S> = (predicate, timeout) => {
377364
return take(predicate, timeout).then(Boolean)
378365
}
379366
const delay = createDelay(internalTaskController.signal)
380367
const fork = createFork(internalTaskController.signal)
381-
368+
const pause: (val: Promise<any>) => Promise<any> = createPause(
369+
internalTaskController.signal
370+
)
382371
try {
383-
entry.taskAbortControllerSet.add(internalTaskController)
372+
entry.pendingSet.add(internalTaskController)
384373
await Promise.resolve(
385374
entry.listener(action, {
386375
...api,
387376
getOriginalState,
388377
condition,
389378
take,
390379
delay,
380+
pause,
391381
currentPhase,
392382
extra,
393383
signal: internalTaskController.signal,
@@ -397,12 +387,10 @@ export function createActionListenerMiddleware<
397387
listenerMap.set(entry.id, entry)
398388
},
399389
cancelPrevious: () => {
400-
entry.taskAbortControllerSet.forEach((controller, _, set) => {
401-
if (
402-
controller !== internalTaskController &&
403-
!controller.signal.aborted
404-
) {
390+
entry.pendingSet.forEach((controller, _, set) => {
391+
if (controller !== internalTaskController) {
405392
controller.abort()
393+
set.delete(controller)
406394
}
407395
})
408396
},
@@ -417,7 +405,7 @@ export function createActionListenerMiddleware<
417405
}
418406
} finally {
419407
internalTaskController.abort() // Notify that the task has completed
420-
entry.taskAbortControllerSet.delete(internalTaskController)
408+
entry.pendingSet.delete(internalTaskController)
421409
}
422410
}
423411

packages/action-listener-middleware/src/outcome.ts

Lines changed: 0 additions & 58 deletions
This file was deleted.
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import { TaskAbortError } from './exceptions'
2+
import type { TaskResult } from './types'
3+
import { noop } from './utils'
4+
5+
/**
6+
* Synchronously raises {@link TaskAbortError} if the task tied to the input `signal` has been cancelled.
7+
* @param signal
8+
* @param reason
9+
* @see {TaskAbortError}
10+
*/
11+
export const validateActive = (signal: AbortSignal, reason?: string): void => {
12+
if (signal.aborted) {
13+
throw new TaskAbortError(reason)
14+
}
15+
}
16+
17+
/**
18+
* Returns a promise that will reject {@link TaskAbortError} if the task is cancelled.
19+
* @param signal
20+
* @returns
21+
*/
22+
export const promisifyAbortSignal = (
23+
signal: AbortSignal,
24+
reason?: string
25+
): Promise<never> => {
26+
const promise = new Promise<never>((_, reject) => {
27+
const notifyRejection = () => reject(new TaskAbortError(reason))
28+
29+
if (signal.aborted) {
30+
notifyRejection()
31+
} else {
32+
signal.addEventListener('abort', notifyRejection, { once: true })
33+
}
34+
})
35+
36+
// We do not want 'unhandledRejection' warnings or crashes caused by cancelled tasks
37+
promise.catch(noop)
38+
39+
return promise
40+
}
41+
42+
/**
43+
* Runs a task and returns promise that resolves to {@link TaskResult}.
44+
*
45+
* Second argument is an optional `cleanUp` function that always runs after task.
46+
* @returns
47+
*/
48+
export const runTask = async <T>(
49+
task: () => Promise<T>,
50+
cleanUp?: () => void
51+
): Promise<TaskResult<T>> => {
52+
try {
53+
await Promise.resolve()
54+
const value = await task()
55+
return {
56+
status: 'ok',
57+
value,
58+
}
59+
} catch (error: any) {
60+
return {
61+
status: error instanceof TaskAbortError ? 'cancelled' : 'rejected',
62+
error,
63+
}
64+
} finally {
65+
cleanUp?.()
66+
}
67+
}
68+
69+
/**
70+
* Given an input `AbortSignal` and a promise returns another promise that resolves
71+
* as soon the input promise is provided or rejects as soon as
72+
* `AbortSignal.abort` is `true`.
73+
* @param signal
74+
* @returns
75+
*/
76+
export const createPause = <T>(signal: AbortSignal) => {
77+
return async (promise: Promise<T>): Promise<T> => {
78+
validateActive(signal)
79+
const result = await Promise.race([promisifyAbortSignal(signal), promise])
80+
validateActive(signal)
81+
return result
82+
}
83+
}
84+
85+
/**
86+
* Given an input `AbortSignal` and `timeoutMs` returns a promise that resolves
87+
* after `timeoutMs` or rejects as soon as `AbortSignal.abort` is `true`.
88+
* @param signal
89+
* @returns
90+
*/
91+
export const createDelay = (signal: AbortSignal) => {
92+
const pause = createPause<void>(signal)
93+
return (timeoutMs: number): Promise<void> => {
94+
return pause(new Promise<void>((resolve) => setTimeout(resolve, timeoutMs)))
95+
}
96+
}

0 commit comments

Comments
 (0)