Skip to content

Commit e2eac2a

Browse files
authored
Merge pull request #1799 from FaberVitale/feature/middleware-jobs--take-implemented-with-jobs
2 parents 5f41cd6 + 10ec32e commit e2eac2a

File tree

3 files changed

+113
-70
lines changed

3 files changed

+113
-70
lines changed

packages/action-listener-middleware/src/index.ts

Lines changed: 66 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -67,51 +67,75 @@ const actualMiddlewarePhases = ['beforeReducer', 'afterReducer'] as const
6767

6868
function createTakePattern<S>(
6969
addListener: AddListenerOverloads<Unsubscribe, S, Dispatch<AnyAction>>,
70-
parentJob?: Job<any>
70+
parentJob: Job<any>
7171
): TakePattern<S> {
72+
/**
73+
* A function that takes an ActionListenerPredicate and an optional timeout,
74+
* and resolves when either the predicate returns `true` based on an action
75+
* state combination or when the timeout expires.
76+
* If the parent listener is canceled while waiting, this will throw a
77+
* JobCancellationException.
78+
*/
7279
async function take<P extends AnyActionListenerPredicate<S>>(
7380
predicate: P,
7481
timeout: number | undefined
7582
) {
83+
// Placeholder unsubscribe function until the listener is added
7684
let unsubscribe: Unsubscribe = () => {}
77-
let job: JobHandle
78-
79-
// TODO Need to figure out how to propagate cancelations of the job that
80-
// is locked inside of the middleware back up into here, so that a canceled
81-
// listener waiting on a condition has that condition throw instead.
82-
// Maybe rewrite these around use of `Job.pause()` instead?
83-
84-
const tuplePromise = new Promise<[AnyAction, S, S]>((resolve) => {
85-
unsubscribe = addListener({
86-
predicate: predicate as any,
87-
listener: (action, listenerApi): void => {
88-
// One-shot listener that cleans up as soon as the predicate resolves
89-
listenerApi.unsubscribe()
90-
resolve([
91-
action,
92-
listenerApi.getState(),
93-
listenerApi.getOriginalState(),
94-
])
95-
},
96-
parentJob,
97-
})
98-
})
99-
100-
let promises: Promise<unknown>[] = [tuplePromise]
101-
102-
if (timeout !== undefined) {
103-
const timedOutPromise = new Promise<null>((resolve, reject) => {
104-
setTimeout(() => {
105-
resolve(null)
106-
}, timeout)
107-
})
108-
promises = [tuplePromise, timedOutPromise]
109-
}
11085

111-
const result = await Promise.race(promises)
86+
// We'll add an additional nested Job representing this function.
87+
// TODO This is really a duplicate of the other job inside the middleware.
88+
// This behavior requires some additional nesting:
89+
// We're going to create a `Promise` representing the result of the listener,
90+
// but then wrap that in an `Outcome` for consistent error handling.
91+
let job: Job<[AnyAction, S, S]> = parentJob.launch(async (job) =>
92+
Outcome.wrap(
93+
new Promise<[AnyAction, S, S]>((resolve) => {
94+
// Inside the Promise, we synchronously add the listener.
95+
unsubscribe = addListener({
96+
predicate: predicate as any,
97+
listener: (action, listenerApi): void => {
98+
// One-shot listener that cleans up as soon as the predicate passes
99+
listenerApi.unsubscribe()
100+
// Resolve the promise with the same arguments the predicate saw
101+
resolve([
102+
action,
103+
listenerApi.getState(),
104+
listenerApi.getOriginalState(),
105+
])
106+
},
107+
parentJob,
108+
})
109+
})
110+
)
111+
)
112112

113-
unsubscribe()
114-
return result
113+
let result: Outcome<[AnyAction, S, S]>
114+
115+
try {
116+
// Run the job and use the timeout if given
117+
result = await (timeout !== undefined
118+
? job.runWithTimeout(timeout)
119+
: job.run())
120+
121+
if (result.isOk()) {
122+
// Resolve the actual `take` promise with the action+states
123+
return result.value
124+
} else {
125+
if (
126+
result.error instanceof JobCancellationException &&
127+
result.error.reason === JobCancellationReason.JobCancelled
128+
) {
129+
// The `take` job itself was canceled due to timeout.
130+
return null
131+
}
132+
// The parent was canceled - reject this promise with that error
133+
throw result.error
134+
}
135+
} finally {
136+
// Always clean up the listener
137+
unsubscribe()
138+
}
115139
}
116140

117141
return take as TakePattern<S>
@@ -310,11 +334,6 @@ export function createActionListenerMiddleware<
310334
return true
311335
}
312336

313-
const take = createTakePattern(addListener)
314-
const condition: ConditionFunction<S> = (predicate, timeout) => {
315-
return take(predicate, timeout).then(Boolean)
316-
}
317-
318337
const middleware: Middleware<
319338
{
320339
(action: Action<'actionListenerMiddleware/add'>): Unsubscribe
@@ -372,6 +391,11 @@ export function createActionListenerMiddleware<
372391
}
373392

374393
entry.parentJob.launchAndRun(async (jobHandle) => {
394+
const take = createTakePattern(addListener, jobHandle as Job<any>)
395+
const condition: ConditionFunction<S> = (predicate, timeout) => {
396+
return take(predicate, timeout).then(Boolean)
397+
}
398+
375399
const result = await Outcome.try(async () =>
376400
entry.listener(action, {
377401
...api,

packages/action-listener-middleware/src/job.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ export class JobCancellationException implements Error {
5353
* [JobCompleted]: The current job was already completed. This only happens if the same job is run more than once.
5454
*/
5555
export enum JobCancellationReason {
56-
ParentJobCancelled,
57-
ParentJobCompleted,
58-
JobCancelled,
59-
JobCompleted,
56+
ParentJobCancelled = 'ParentJobCancelled',
57+
ParentJobCompleted = 'ParentJobCompleted',
58+
JobCancelled = 'JobCancelled',
59+
JobCompleted = 'JobCompleted',
6060
}
6161

6262
/**

packages/action-listener-middleware/src/tests/listenerMiddleware.test.ts

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import type {
2121
TypedAddListener,
2222
Unsubscribe,
2323
} from '../index'
24+
import { JobCancellationException } from '../job'
25+
import { createNonNullChain } from 'typescript'
2426

2527
const middlewareApi = {
2628
getState: expect.any(Function),
@@ -732,40 +734,42 @@ describe('createActionListenerMiddleware', () => {
732734
middleware: (gDM) => gDM().prepend(middleware),
733735
})
734736

737+
let takeResult: any = undefined
738+
735739
middleware.addListener({
736740
predicate: incrementByAmount.match,
737741
listener: async (_, listenerApi) => {
738-
const result = await listenerApi.take(increment.match, 50)
739-
740-
expect(result).toBe(null)
742+
takeResult = await listenerApi.take(increment.match, 15)
741743
},
742744
})
743745
store.dispatch(incrementByAmount(1))
744-
await delay(200)
746+
await delay(25)
747+
748+
expect(takeResult).toBe(null)
745749
})
746750

747-
test("take resolves to [A, CurrentState, PreviousState] if the timeout is provided but doesn't expires", (done) => {
751+
test("take resolves to [A, CurrentState, PreviousState] if the timeout is provided but doesn't expires", async () => {
748752
const store = configureStore({
749753
reducer: counterSlice.reducer,
750754
middleware: (gDM) => gDM().prepend(middleware),
751755
})
756+
let takeResult: any = undefined
757+
let stateBefore: any = undefined
758+
let stateCurrent: any = undefined
752759

753760
middleware.addListener({
754761
predicate: incrementByAmount.match,
755762
listener: async (_, listenerApi) => {
756-
const stateBefore = listenerApi.getState()
757-
const result = await listenerApi.take(increment.match, 50)
758-
759-
expect(result).toEqual([
760-
increment(),
761-
listenerApi.getState(),
762-
stateBefore,
763-
])
764-
done()
763+
stateBefore = listenerApi.getState()
764+
takeResult = await listenerApi.take(increment.match, 50)
765+
stateCurrent = listenerApi.getState()
765766
},
766767
})
767768
store.dispatch(incrementByAmount(1))
768769
store.dispatch(increment())
770+
771+
await delay(25)
772+
expect(takeResult).toEqual([increment(), stateCurrent, stateBefore])
769773
})
770774

771775
test('condition method resolves promise when the predicate succeeds', async () => {
@@ -799,11 +803,11 @@ describe('createActionListenerMiddleware', () => {
799803

800804
store.dispatch(increment())
801805
expect(listenerStarted).toBe(true)
802-
await delay(50)
806+
await delay(25)
803807
store.dispatch(increment())
804808
store.dispatch(increment())
805809

806-
await delay(50)
810+
await delay(25)
807811

808812
expect(finalCount).toBe(3)
809813
})
@@ -828,7 +832,7 @@ describe('createActionListenerMiddleware', () => {
828832
listenerStarted = true
829833
const result = await listenerApi.condition((action, currentState) => {
830834
return (currentState as CounterState).value === 3
831-
}, 50)
835+
}, 25)
832836

833837
expect(result).toBe(false)
834838
const latestState = listenerApi.getState() as CounterState
@@ -842,34 +846,49 @@ describe('createActionListenerMiddleware', () => {
842846

843847
store.dispatch(increment())
844848

845-
await delay(150)
849+
await delay(50)
846850
store.dispatch(increment())
847851

848852
expect(finalCount).toBe(2)
849853
})
850854
})
851855

852856
describe('Job API', () => {
853-
test.skip('Allows canceling previous jobs', () => {
857+
test('Allows canceling previous jobs', async () => {
854858
let jobsStarted = 0
859+
let jobsContinued = 0
860+
let jobsCanceled = 0
855861

856862
middleware.addListener({
857863
actionCreator: increment,
858864
listener: async (action, listenerApi) => {
859865
jobsStarted++
860866

861867
if (jobsStarted < 3) {
862-
await listenerApi.condition(decrement.match)
863-
// Cancelation _should_ cause `condition()` to throw so we never
864-
// end up hitting this next line
865-
console.log('Continuing after decrement')
868+
try {
869+
await listenerApi.condition(decrement.match)
870+
// Cancelation _should_ cause `condition()` to throw so we never
871+
// end up hitting this next line
872+
jobsContinued++
873+
} catch (err) {
874+
if (err instanceof JobCancellationException) {
875+
jobsCanceled++
876+
}
877+
}
866878
} else {
867879
listenerApi.cancelPrevious()
868880
}
869881
},
870882
})
871883

872-
// TODO Write the rest of this test
884+
store.dispatch(increment())
885+
store.dispatch(increment())
886+
store.dispatch(increment())
887+
888+
await delay(10)
889+
expect(jobsStarted).toBe(3)
890+
expect(jobsContinued).toBe(0)
891+
expect(jobsCanceled).toBe(2)
873892
})
874893
})
875894

0 commit comments

Comments
 (0)