Skip to content

Commit b96ffe1

Browse files
authored
Merge pull request #2070 from FaberVitale/fix/alm-reject-task-if-listener-is-cancelled
2 parents 6e1d675 + 91f4c08 commit b96ffe1

File tree

7 files changed

+59
-45
lines changed

7 files changed

+59
-45
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
"scripts": {
4646
"build": "yarn build:packages",
4747
"test": "yarn test:packages",
48-
"build:examples": "yarn workspaces foreach --include '@reduxjs/*' --include '@examples-query-react/*' -vtp run build",
48+
"build:examples": "yarn workspaces foreach --include '@reduxjs/*' --include '@examples-query-react/*' --include '@examples-action-listener/*' -vtp run build",
4949
"build:docs": "yarn workspace website run build",
5050
"build:packages": "yarn workspaces foreach --include '@reduxjs/*' --include '@rtk-query/*' --include '@rtk-incubator/*' --topological-dev run build",
5151
"test:packages": "yarn workspaces foreach --include '@reduxjs/*' --include '@rtk-query/*' --include '@rtk-incubator/*' run test",

packages/action-listener-middleware/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ Both these methods are cancelation-aware, and will throw a `TaskAbortError` if t
285285

286286
- `fork: (executor: (forkApi: ForkApi) => T | Promise<T>) => ForkedTask<T>`: Launches a "child task" that may be used to accomplish additional work. Accepts any sync or async function as its argument, and returns a `{result, cancel}` object that can be used to check the final status and return value of the child task, or cancel it while in-progress.
287287

288-
Child tasks can be launched, and waited on to collect their return values. The provided `executor` function will be called with a `forkApi` object containing `{pause, delay, signal}`, allowing it to pause or check cancelation status. It can also make use of the `listenerApi` from the listener's scope.
288+
Child tasks can be launched, and waited on to collect their return values. The provided `executor` function will be called asynchronously with a `forkApi` object containing `{pause, delay, signal}`, allowing it to pause or check cancelation status. It can also make use of the `listenerApi` from the listener's scope.
289289

290290
An example of this might be a listener that forks a child task containing an infinite loop that listens for events from a server. The parent then uses `listenerApi.condition()` to wait for a "stop" action, and cancels the child task.
291291

packages/action-listener-middleware/src/index.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import type {
2828
ForkedTask,
2929
TypedRemoveListener,
3030
TypedStopListening,
31+
TaskResult,
3132
} from './types'
3233
import {
3334
abortControllerWithReason,
@@ -85,9 +86,6 @@ const createFork = (parentAbortSignal: AbortSignal) => {
8586
return <T>(taskExecutor: ForkedTaskExecutor<T>): ForkedTask<T> => {
8687
assertFunction(taskExecutor, 'taskExecutor')
8788
const childAbortController = new AbortController()
88-
const cancel = () => {
89-
abortControllerWithReason(childAbortController, taskCancelled)
90-
}
9189

9290
const result = runTask<T>(
9391
async (): Promise<T> => {
@@ -98,16 +96,17 @@ const createFork = (parentAbortSignal: AbortSignal) => {
9896
delay: createDelay(childAbortController.signal),
9997
signal: childAbortController.signal,
10098
})) as T
101-
validateActive(parentAbortSignal)
10299
validateActive(childAbortController.signal)
103100
return result
104101
},
105102
() => abortControllerWithReason(childAbortController, taskCompleted)
106103
)
107104

108105
return {
109-
result,
110-
cancel,
106+
result: createPause<TaskResult<T>>(parentAbortSignal)(result),
107+
cancel() {
108+
abortControllerWithReason(childAbortController, taskCancelled)
109+
},
111110
}
112111
}
113112
}

packages/action-listener-middleware/src/task.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@ export const promisifyAbortSignal = (
3737

3838
/**
3939
* Runs a task and returns promise that resolves to {@link TaskResult}.
40-
*
4140
* Second argument is an optional `cleanUp` function that always runs after task.
41+
*
42+
* **Note:** `runTask` runs the executor in the next microtask.
4243
* @returns
4344
*/
4445
export const runTask = async <T>(

packages/action-listener-middleware/src/tests/fork.test.ts

Lines changed: 26 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { EnhancedStore } from '@reduxjs/toolkit'
2-
import { configureStore, createSlice } from '@reduxjs/toolkit'
2+
import { configureStore, createSlice, createAction } from '@reduxjs/toolkit'
33

44
import type { PayloadAction } from '@reduxjs/toolkit'
55
import type { ForkedTaskExecutor, TaskResult } from '../types'
@@ -99,24 +99,23 @@ describe('fork', () => {
9999
expect(hasRunAsyncExecutor).toBe(true)
100100
})
101101

102-
it('runs forked tasks that are cancelled if parent listener is cancelled', async () => {
102+
test('forkedTask.result rejects TaskAbortError if listener is cancelled', async () => {
103103
const deferredForkedTaskError = deferred()
104104

105105
startListening({
106106
actionCreator: increment,
107-
effect: async (_, listenerApi) => {
107+
async effect(_, listenerApi) {
108108
listenerApi.cancelActiveListeners()
109-
const result = await listenerApi.fork(async () => {
110-
await delay(20)
111-
112-
throw new Error('unreachable code')
113-
}).result
109+
listenerApi
110+
.fork(async () => {
111+
await delay(10)
114112

115-
if (result.status !== 'ok') {
116-
deferredForkedTaskError.resolve(result.error)
117-
} else {
118-
deferredForkedTaskError.reject(new Error('unreachable code'))
119-
}
113+
throw new Error('unreachable code')
114+
})
115+
.result.then(
116+
deferredForkedTaskError.resolve,
117+
deferredForkedTaskError.resolve
118+
)
120119
},
121120
})
122121

@@ -386,26 +385,25 @@ describe('fork', () => {
386385
})
387386

388387
test('forkApi.pause rejects if listener is cancelled', async () => {
389-
let deferredResult = deferred()
388+
const incrementByInListener = createAction<number>('incrementByInListener')
389+
390390
startListening({
391-
actionCreator: increment,
392-
effect: async (_, listenerApi) => {
391+
actionCreator: incrementByInListener,
392+
async effect({ payload: amountToIncrement }, listenerApi) {
393393
listenerApi.cancelActiveListeners()
394-
const forkedTask = listenerApi.fork(async (forkApi) => {
395-
await forkApi.pause(delay(30))
396-
397-
return 4
398-
})
399-
deferredResult.resolve(await forkedTask.result)
394+
await listenerApi.fork(async (forkApi) => {
395+
await forkApi.pause(delay(10))
396+
listenerApi.dispatch(incrementByAmount(amountToIncrement))
397+
}).result
398+
listenerApi.dispatch(incrementByAmount(2 * amountToIncrement))
400399
},
401400
})
402401

403-
store.dispatch(increment())
404-
store.dispatch(increment())
402+
store.dispatch(incrementByInListener(10))
403+
store.dispatch(incrementByInListener(100))
405404

406-
expect(await deferredResult).toEqual({
407-
status: 'cancelled',
408-
error: new TaskAbortError(listenerCancelled),
409-
})
405+
await delay(50)
406+
407+
expect(store.getState().value).toEqual(300)
410408
})
411409
})

packages/action-listener-middleware/src/tests/listenerMiddleware.test.ts

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -709,24 +709,27 @@ describe('createListenerMiddleware', () => {
709709
})
710710

711711
test('clear() cancels all running forked tasks', async () => {
712-
const fork1Test = deferred()
712+
const store = configureStore({
713+
reducer: counterSlice.reducer,
714+
middleware: (gDM) => gDM().prepend(middleware),
715+
})
713716

714717
startListening({
715718
actionCreator: testAction1,
716-
async effect(_, { fork }) {
717-
const taskResult = await fork(() => {
718-
return 3
719-
}).result
720-
fork1Test.resolve(taskResult)
719+
async effect(_, { fork, dispatch }) {
720+
await fork(() => dispatch(incrementByAmount(3))).result
721+
dispatch(incrementByAmount(4))
721722
},
722723
})
723724

725+
expect(store.getState().value).toBe(0)
724726
store.dispatch(testAction1('a'))
725727

726728
clearListeners()
727-
store.dispatch(testAction1('b'))
728729

729-
expect(await fork1Test).toHaveProperty('status', 'cancelled')
730+
await Promise.resolve() // Forked tasks run on the next microtask.
731+
732+
expect(store.getState().value).toBe(0)
730733
})
731734
})
732735

packages/action-listener-middleware/src/types.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,19 @@ export type TaskResult<Value> =
105105
| TaskCancelled
106106

107107
export interface ForkedTask<T> {
108+
/**
109+
* A promise that resolves when the task is either completed or cancelled or rejects
110+
* if parent listener execution is cancelled or completed.
111+
*
112+
* ### Example
113+
* ```ts
114+
* const result = await fork(async (forkApi) => Promise.resolve(4)).result
115+
*
116+
* if(result.status === 'ok') {
117+
* console.log(result.value) // logs 4
118+
* }}
119+
* ```
120+
*/
108121
result: Promise<TaskResult<T>>
109122
/**
110123
* Cancel task if it is in progress or not yet started,

0 commit comments

Comments
 (0)