Skip to content

Commit 4ee19e1

Browse files
committed
rfactor: implement take using jobs
1 parent 5f41cd6 commit 4ee19e1

File tree

2 files changed

+26
-30
lines changed

2 files changed

+26
-30
lines changed

packages/action-listener-middleware/src/index.ts

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -67,51 +67,47 @@ const actualMiddlewarePhases = ['beforeReducer', 'afterReducer'] as const
6767

6868
function createTakePattern<S>(
6969
addListener: AddListenerOverloads<Unsubscribe, S, Dispatch<AnyAction>>,
70-
parentJob?: Job<any>
70+
parentJob: Job<any>
7171
): TakePattern<S> {
7272
async function take<P extends AnyActionListenerPredicate<S>>(
7373
predicate: P,
7474
timeout: number | undefined
7575
) {
7676
let unsubscribe: Unsubscribe = () => {}
77-
let job: JobHandle
78-
79-
// TODO Need to figure out how to propagate cancelations of the job that
80-
// is locked inside of the middleware back up into here, so that a canceled
81-
// listener waiting on a condition has that condition throw instead.
82-
// Maybe rewrite these around use of `Job.pause()` instead?
83-
84-
const tuplePromise = new Promise<[AnyAction, S, S]>((resolve) => {
77+
let job: Job<[AnyAction, S, S]> = parentJob.launch(async (job) => Outcome.wrap(new Promise<[AnyAction, S, S]>((resolve) => {
8578
unsubscribe = addListener({
8679
predicate: predicate as any,
8780
listener: (action, listenerApi): void => {
8881
// One-shot listener that cleans up as soon as the predicate resolves
8982
listenerApi.unsubscribe()
90-
resolve([
83+
resolve(([
9184
action,
9285
listenerApi.getState(),
9386
listenerApi.getOriginalState(),
94-
])
87+
]))
9588
},
9689
parentJob,
9790
})
98-
})
91+
})));
9992

100-
let promises: Promise<unknown>[] = [tuplePromise]
10193

102-
if (timeout !== undefined) {
103-
const timedOutPromise = new Promise<null>((resolve, reject) => {
104-
setTimeout(() => {
105-
resolve(null)
106-
}, timeout)
107-
})
108-
promises = [tuplePromise, timedOutPromise]
109-
}
94+
let result: Outcome<[AnyAction, S, S]>;
95+
96+
try {
97+
result = await (timeout !== undefined ? job.runWithTimeout(timeout) : job.run());
98+
99+
if(result.isOk()) {
100+
return result.value;
101+
} else if(result.error instanceof JobCancellationException) {
102+
return false;
103+
}
110104

111-
const result = await Promise.race(promises)
105+
throw result.error;
106+
107+
} finally {
108+
unsubscribe()
109+
}
112110

113-
unsubscribe()
114-
return result
115111
}
116112

117113
return take as TakePattern<S>
@@ -310,11 +306,6 @@ export function createActionListenerMiddleware<
310306
return true
311307
}
312308

313-
const take = createTakePattern(addListener)
314-
const condition: ConditionFunction<S> = (predicate, timeout) => {
315-
return take(predicate, timeout).then(Boolean)
316-
}
317-
318309
const middleware: Middleware<
319310
{
320311
(action: Action<'actionListenerMiddleware/add'>): Unsubscribe
@@ -372,6 +363,11 @@ export function createActionListenerMiddleware<
372363
}
373364

374365
entry.parentJob.launchAndRun(async (jobHandle) => {
366+
const take = createTakePattern(addListener, entry.parentJob as Job<any>);
367+
const condition: ConditionFunction<S> = (predicate, timeout) => {
368+
return take(predicate, timeout).then(Boolean)
369+
}
370+
375371
const result = await Outcome.try(async () =>
376372
entry.listener(action, {
377373
...api,

packages/action-listener-middleware/src/tests/listenerMiddleware.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -850,7 +850,7 @@ describe('createActionListenerMiddleware', () => {
850850
})
851851

852852
describe('Job API', () => {
853-
test.skip('Allows canceling previous jobs', () => {
853+
test('Allows canceling previous jobs', () => {
854854
let jobsStarted = 0
855855

856856
middleware.addListener({

0 commit comments

Comments
 (0)