@@ -30,6 +30,15 @@ import type {
30
30
ListenerErrorInfo ,
31
31
} from './types'
32
32
33
+ import {
34
+ Job ,
35
+ SupervisorJob ,
36
+ JobHandle ,
37
+ JobCancellationReason ,
38
+ JobCancellationException ,
39
+ } from './job'
40
+ import { Outcome } from './outcome'
41
+
33
42
export type {
34
43
ActionListener ,
35
44
ActionListenerMiddleware ,
@@ -57,43 +66,76 @@ const defaultWhen: MiddlewarePhase = 'afterReducer'
57
66
const actualMiddlewarePhases = [ 'beforeReducer' , 'afterReducer' ] as const
58
67
59
68
function createTakePattern < S > (
60
- addListener : AddListenerOverloads < Unsubscribe , S , Dispatch < AnyAction > >
69
+ addListener : AddListenerOverloads < Unsubscribe , S , Dispatch < AnyAction > > ,
70
+ parentJob : Job < any >
61
71
) : TakePattern < S > {
72
+ /**
73
+ * A function that takes an ActionListenerPredicate and an optional timeout,
74
+ * and resolves when either the predicate returns `true` based on an action
75
+ * state combination or when the timeout expires.
76
+ * If the parent listener is canceled while waiting, this will throw a
77
+ * JobCancellationException.
78
+ */
62
79
async function take < P extends AnyActionListenerPredicate < S > > (
63
80
predicate : P ,
64
81
timeout : number | undefined
65
82
) {
83
+ // Placeholder unsubscribe function until the listener is added
66
84
let unsubscribe : Unsubscribe = ( ) => { }
67
85
68
- const tuplePromise = new Promise < [ AnyAction , S , S ] > ( ( resolve ) => {
69
- unsubscribe = addListener ( {
70
- predicate : predicate as any ,
71
- listener : ( action , listenerApi ) : void => {
72
- // One-shot listener that cleans up as soon as the predicate resolves
73
- listenerApi . unsubscribe ( )
74
- resolve ( [
75
- action ,
76
- listenerApi . getState ( ) ,
77
- listenerApi . getOriginalState ( ) ,
78
- ] )
79
- } ,
80
- } )
81
- } )
82
-
83
- if ( timeout === undefined ) {
84
- return tuplePromise
85
- }
86
+ // We'll add an additional nested Job representing this function.
87
+ // TODO This is really a duplicate of the other job inside the middleware.
88
+ // This behavior requires some additional nesting:
89
+ // We're going to create a `Promise` representing the result of the listener,
90
+ // but then wrap that in an `Outcome` for consistent error handling.
91
+ let job : Job < [ AnyAction , S , S ] > = parentJob . launch ( async ( job ) =>
92
+ Outcome . wrap (
93
+ new Promise < [ AnyAction , S , S ] > ( ( resolve ) => {
94
+ // Inside the Promise, we synchronously add the listener.
95
+ unsubscribe = addListener ( {
96
+ predicate : predicate as any ,
97
+ listener : ( action , listenerApi ) : void => {
98
+ // One-shot listener that cleans up as soon as the predicate passes
99
+ listenerApi . unsubscribe ( )
100
+ // Resolve the promise with the same arguments the predicate saw
101
+ resolve ( [
102
+ action ,
103
+ listenerApi . getState ( ) ,
104
+ listenerApi . getOriginalState ( ) ,
105
+ ] )
106
+ } ,
107
+ parentJob,
108
+ } )
109
+ } )
110
+ )
111
+ )
86
112
87
- const timedOutPromise = new Promise < null > ( ( resolve , reject ) => {
88
- setTimeout ( ( ) => {
89
- resolve ( null )
90
- } , timeout )
91
- } )
113
+ let result : Outcome < [ AnyAction , S , S ] >
92
114
93
- const result = await Promise . race ( [ tuplePromise , timedOutPromise ] )
115
+ try {
116
+ // Run the job and use the timeout if given
117
+ result = await ( timeout !== undefined
118
+ ? job . runWithTimeout ( timeout )
119
+ : job . run ( ) )
94
120
95
- unsubscribe ( )
96
- return result
121
+ if ( result . isOk ( ) ) {
122
+ // Resolve the actual `take` promise with the action+states
123
+ return result . value
124
+ } else {
125
+ if (
126
+ result . error instanceof JobCancellationException &&
127
+ result . error . reason === JobCancellationReason . JobCancelled
128
+ ) {
129
+ // The `take` job itself was canceled due to timeout.
130
+ return null
131
+ }
132
+ // The parent was canceled - reject this promise with that error
133
+ throw result . error
134
+ }
135
+ } finally {
136
+ // Always clean up the listener
137
+ unsubscribe ( )
138
+ }
97
139
}
98
140
99
141
return take as TakePattern < S >
@@ -114,8 +156,12 @@ export const createListenerEntry: TypedCreateListenerEntry<unknown> = (
114
156
predicate = options . actionCreator . match
115
157
} else if ( 'matcher' in options ) {
116
158
predicate = options . matcher
117
- } else {
159
+ } else if ( 'predicate' in options ) {
118
160
predicate = options . predicate
161
+ } else {
162
+ throw new Error (
163
+ 'Creating a listener requires one of the known fields for matching against actions'
164
+ )
119
165
}
120
166
121
167
const id = nanoid ( )
@@ -128,6 +174,7 @@ export const createListenerEntry: TypedCreateListenerEntry<unknown> = (
128
174
unsubscribe : ( ) => {
129
175
throw new Error ( 'Unsubscribe not initialized' )
130
176
} ,
177
+ parentJob : new SupervisorJob ( ) ,
131
178
}
132
179
133
180
return entry
@@ -287,11 +334,6 @@ export function createActionListenerMiddleware<
287
334
return true
288
335
}
289
336
290
- const take = createTakePattern ( addListener )
291
- const condition : ConditionFunction < S > = ( predicate , timeout ) => {
292
- return take ( predicate , timeout ) . then ( Boolean )
293
- }
294
-
295
337
const middleware : Middleware <
296
338
{
297
339
( action : Action < 'actionListenerMiddleware/add' > ) : Unsubscribe
@@ -338,7 +380,6 @@ export function createActionListenerMiddleware<
338
380
runListener = false
339
381
340
382
safelyNotifyError ( onError , predicateError , {
341
- async : false ,
342
383
raisedBy : 'predicate' ,
343
384
phase : currentPhase ,
344
385
} )
@@ -349,38 +390,47 @@ export function createActionListenerMiddleware<
349
390
continue
350
391
}
351
392
352
- try {
353
- let promiseLikeOrUndefined = entry . listener ( action , {
354
- ...api ,
355
- getOriginalState,
356
- condition,
357
- take,
358
- currentPhase,
359
- extra,
360
- unsubscribe : entry . unsubscribe ,
361
- subscribe : ( ) => {
362
- listenerMap . set ( entry . id , entry )
363
- } ,
364
- } )
393
+ entry . parentJob . launchAndRun ( async ( jobHandle ) => {
394
+ const take = createTakePattern ( addListener , jobHandle as Job < any > )
395
+ const condition : ConditionFunction < S > = ( predicate , timeout ) => {
396
+ return take ( predicate , timeout ) . then ( Boolean )
397
+ }
365
398
366
- if ( promiseLikeOrUndefined ) {
367
- Promise . resolve ( promiseLikeOrUndefined ) . catch (
368
- ( asyncListenerError ) => {
369
- safelyNotifyError ( onError , asyncListenerError , {
370
- async : true ,
371
- raisedBy : 'listener' ,
372
- phase : currentPhase ,
373
- } )
374
- }
375
- )
399
+ const result = await Outcome . try ( async ( ) =>
400
+ entry . listener ( action , {
401
+ ...api ,
402
+ getOriginalState,
403
+ condition,
404
+ take,
405
+ currentPhase,
406
+ extra,
407
+ unsubscribe : entry . unsubscribe ,
408
+ subscribe : ( ) => {
409
+ listenerMap . set ( entry . id , entry )
410
+ } ,
411
+ job : jobHandle ,
412
+ cancelPrevious : ( ) => {
413
+ entry . parentJob . cancelChildren (
414
+ new JobCancellationException (
415
+ JobCancellationReason . JobCancelled
416
+ ) ,
417
+ [ jobHandle ]
418
+ )
419
+ } ,
420
+ } )
421
+ )
422
+ if (
423
+ result . isError ( ) &&
424
+ ! ( result . error instanceof JobCancellationException )
425
+ ) {
426
+ safelyNotifyError ( onError , result . error , {
427
+ raisedBy : 'listener' ,
428
+ phase : currentPhase ,
429
+ } )
376
430
}
377
- } catch ( syncListenerError ) {
378
- safelyNotifyError ( onError , syncListenerError , {
379
- async : false ,
380
- raisedBy : 'listener' ,
381
- phase : currentPhase ,
382
- } )
383
- }
431
+
432
+ return Outcome . ok ( 1 )
433
+ } )
384
434
}
385
435
if ( currentPhase === 'beforeReducer' ) {
386
436
result = next ( action )
0 commit comments