Skip to content

Commit 10ec32e

Browse files
committed
Fix take cancelation behavior and fill out tests
1 parent 4ee19e1 commit 10ec32e

File tree

3 files changed

+107
-60
lines changed

3 files changed

+107
-60
lines changed

packages/action-listener-middleware/src/index.ts

Lines changed: 60 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -69,45 +69,73 @@ function createTakePattern<S>(
6969
addListener: AddListenerOverloads<Unsubscribe, S, Dispatch<AnyAction>>,
7070
parentJob: Job<any>
7171
): TakePattern<S> {
72+
/**
73+
* A function that takes an ActionListenerPredicate and an optional timeout,
74+
* and resolves when either the predicate returns `true` based on an action
75+
* state combination or when the timeout expires.
76+
* If the parent listener is canceled while waiting, this will throw a
77+
* JobCancellationException.
78+
*/
7279
async function take<P extends AnyActionListenerPredicate<S>>(
7380
predicate: P,
7481
timeout: number | undefined
7582
) {
83+
// Placeholder unsubscribe function until the listener is added
7684
let unsubscribe: Unsubscribe = () => {}
77-
let job: Job<[AnyAction, S, S]> = parentJob.launch(async (job) => Outcome.wrap(new Promise<[AnyAction, S, S]>((resolve) => {
78-
unsubscribe = addListener({
79-
predicate: predicate as any,
80-
listener: (action, listenerApi): void => {
81-
// One-shot listener that cleans up as soon as the predicate resolves
82-
listenerApi.unsubscribe()
83-
resolve(([
84-
action,
85-
listenerApi.getState(),
86-
listenerApi.getOriginalState(),
87-
]))
88-
},
89-
parentJob,
90-
})
91-
})));
92-
93-
94-
let result: Outcome<[AnyAction, S, S]>;
9585

96-
try {
97-
result = await (timeout !== undefined ? job.runWithTimeout(timeout) : job.run());
98-
99-
if(result.isOk()) {
100-
return result.value;
101-
} else if(result.error instanceof JobCancellationException) {
102-
return false;
103-
}
86+
// We'll add an additional nested Job representing this function.
87+
// TODO This is really a duplicate of the other job inside the middleware.
88+
// This behavior requires some additional nesting:
89+
// We're going to create a `Promise` representing the result of the listener,
90+
// but then wrap that in an `Outcome` for consistent error handling.
91+
let job: Job<[AnyAction, S, S]> = parentJob.launch(async (job) =>
92+
Outcome.wrap(
93+
new Promise<[AnyAction, S, S]>((resolve) => {
94+
// Inside the Promise, we synchronously add the listener.
95+
unsubscribe = addListener({
96+
predicate: predicate as any,
97+
listener: (action, listenerApi): void => {
98+
// One-shot listener that cleans up as soon as the predicate passes
99+
listenerApi.unsubscribe()
100+
// Resolve the promise with the same arguments the predicate saw
101+
resolve([
102+
action,
103+
listenerApi.getState(),
104+
listenerApi.getOriginalState(),
105+
])
106+
},
107+
parentJob,
108+
})
109+
})
110+
)
111+
)
104112

105-
throw result.error;
113+
let result: Outcome<[AnyAction, S, S]>
106114

115+
try {
116+
// Run the job and use the timeout if given
117+
result = await (timeout !== undefined
118+
? job.runWithTimeout(timeout)
119+
: job.run())
120+
121+
if (result.isOk()) {
122+
// Resolve the actual `take` promise with the action+states
123+
return result.value
124+
} else {
125+
if (
126+
result.error instanceof JobCancellationException &&
127+
result.error.reason === JobCancellationReason.JobCancelled
128+
) {
129+
// The `take` job itself was canceled due to timeout.
130+
return null
131+
}
132+
// The parent was canceled - reject this promise with that error
133+
throw result.error
134+
}
107135
} finally {
136+
// Always clean up the listener
108137
unsubscribe()
109138
}
110-
111139
}
112140

113141
return take as TakePattern<S>
@@ -363,10 +391,10 @@ export function createActionListenerMiddleware<
363391
}
364392

365393
entry.parentJob.launchAndRun(async (jobHandle) => {
366-
const take = createTakePattern(addListener, entry.parentJob as Job<any>);
367-
const condition: ConditionFunction<S> = (predicate, timeout) => {
368-
return take(predicate, timeout).then(Boolean)
369-
}
394+
const take = createTakePattern(addListener, jobHandle as Job<any>)
395+
const condition: ConditionFunction<S> = (predicate, timeout) => {
396+
return take(predicate, timeout).then(Boolean)
397+
}
370398

371399
const result = await Outcome.try(async () =>
372400
entry.listener(action, {

packages/action-listener-middleware/src/job.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ export class JobCancellationException implements Error {
5353
* [JobCompleted]: The current job was already completed. This only happens if the same job is run more than once.
5454
*/
5555
export enum JobCancellationReason {
56-
ParentJobCancelled,
57-
ParentJobCompleted,
58-
JobCancelled,
59-
JobCompleted,
56+
ParentJobCancelled = 'ParentJobCancelled',
57+
ParentJobCompleted = 'ParentJobCompleted',
58+
JobCancelled = 'JobCancelled',
59+
JobCompleted = 'JobCompleted',
6060
}
6161

6262
/**

packages/action-listener-middleware/src/tests/listenerMiddleware.test.ts

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import type {
2121
TypedAddListener,
2222
Unsubscribe,
2323
} from '../index'
24+
import { JobCancellationException } from '../job'
25+
import { createNonNullChain } from 'typescript'
2426

2527
const middlewareApi = {
2628
getState: expect.any(Function),
@@ -732,40 +734,42 @@ describe('createActionListenerMiddleware', () => {
732734
middleware: (gDM) => gDM().prepend(middleware),
733735
})
734736

737+
let takeResult: any = undefined
738+
735739
middleware.addListener({
736740
predicate: incrementByAmount.match,
737741
listener: async (_, listenerApi) => {
738-
const result = await listenerApi.take(increment.match, 50)
739-
740-
expect(result).toBe(null)
742+
takeResult = await listenerApi.take(increment.match, 15)
741743
},
742744
})
743745
store.dispatch(incrementByAmount(1))
744-
await delay(200)
746+
await delay(25)
747+
748+
expect(takeResult).toBe(null)
745749
})
746750

747-
test("take resolves to [A, CurrentState, PreviousState] if the timeout is provided but doesn't expires", (done) => {
751+
test("take resolves to [A, CurrentState, PreviousState] if the timeout is provided but doesn't expires", async () => {
748752
const store = configureStore({
749753
reducer: counterSlice.reducer,
750754
middleware: (gDM) => gDM().prepend(middleware),
751755
})
756+
let takeResult: any = undefined
757+
let stateBefore: any = undefined
758+
let stateCurrent: any = undefined
752759

753760
middleware.addListener({
754761
predicate: incrementByAmount.match,
755762
listener: async (_, listenerApi) => {
756-
const stateBefore = listenerApi.getState()
757-
const result = await listenerApi.take(increment.match, 50)
758-
759-
expect(result).toEqual([
760-
increment(),
761-
listenerApi.getState(),
762-
stateBefore,
763-
])
764-
done()
763+
stateBefore = listenerApi.getState()
764+
takeResult = await listenerApi.take(increment.match, 50)
765+
stateCurrent = listenerApi.getState()
765766
},
766767
})
767768
store.dispatch(incrementByAmount(1))
768769
store.dispatch(increment())
770+
771+
await delay(25)
772+
expect(takeResult).toEqual([increment(), stateCurrent, stateBefore])
769773
})
770774

771775
test('condition method resolves promise when the predicate succeeds', async () => {
@@ -799,11 +803,11 @@ describe('createActionListenerMiddleware', () => {
799803

800804
store.dispatch(increment())
801805
expect(listenerStarted).toBe(true)
802-
await delay(50)
806+
await delay(25)
803807
store.dispatch(increment())
804808
store.dispatch(increment())
805809

806-
await delay(50)
810+
await delay(25)
807811

808812
expect(finalCount).toBe(3)
809813
})
@@ -828,7 +832,7 @@ describe('createActionListenerMiddleware', () => {
828832
listenerStarted = true
829833
const result = await listenerApi.condition((action, currentState) => {
830834
return (currentState as CounterState).value === 3
831-
}, 50)
835+
}, 25)
832836

833837
expect(result).toBe(false)
834838
const latestState = listenerApi.getState() as CounterState
@@ -842,34 +846,49 @@ describe('createActionListenerMiddleware', () => {
842846

843847
store.dispatch(increment())
844848

845-
await delay(150)
849+
await delay(50)
846850
store.dispatch(increment())
847851

848852
expect(finalCount).toBe(2)
849853
})
850854
})
851855

852856
describe('Job API', () => {
853-
test('Allows canceling previous jobs', () => {
857+
test('Allows canceling previous jobs', async () => {
854858
let jobsStarted = 0
859+
let jobsContinued = 0
860+
let jobsCanceled = 0
855861

856862
middleware.addListener({
857863
actionCreator: increment,
858864
listener: async (action, listenerApi) => {
859865
jobsStarted++
860866

861867
if (jobsStarted < 3) {
862-
await listenerApi.condition(decrement.match)
863-
// Cancelation _should_ cause `condition()` to throw so we never
864-
// end up hitting this next line
865-
console.log('Continuing after decrement')
868+
try {
869+
await listenerApi.condition(decrement.match)
870+
// Cancelation _should_ cause `condition()` to throw so we never
871+
// end up hitting this next line
872+
jobsContinued++
873+
} catch (err) {
874+
if (err instanceof JobCancellationException) {
875+
jobsCanceled++
876+
}
877+
}
866878
} else {
867879
listenerApi.cancelPrevious()
868880
}
869881
},
870882
})
871883

872-
// TODO Write the rest of this test
884+
store.dispatch(increment())
885+
store.dispatch(increment())
886+
store.dispatch(increment())
887+
888+
await delay(10)
889+
expect(jobsStarted).toBe(3)
890+
expect(jobsContinued).toBe(0)
891+
expect(jobsCanceled).toBe(2)
873892
})
874893
})
875894

0 commit comments

Comments
 (0)