Skip to content

Commit 9dbdde5

Browse files
authored
Merge pull request #2759 from reduxjs/feature/middleware-internal-subscriptions
2 parents 73abb6a + 6975282 commit 9dbdde5

20 files changed

+456
-191
lines changed
Lines changed: 67 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import type { QueryThunk, RejectedAction } from '../buildThunks'
22
import type { InternalHandlerBuilder } from './types'
3+
import type { SubscriptionState } from '../apiState'
4+
import { produceWithPatches } from 'immer'
35

46
// Copied from https://github.com/feross/queue-microtask
57
let promise: Promise<any>
@@ -14,39 +16,75 @@ const queueMicrotaskShim =
1416
}, 0)
1517
)
1618

17-
export const buildBatchedActionsHandler: InternalHandlerBuilder<boolean> = ({
18-
api,
19-
queryThunk,
20-
}) => {
21-
let abortedQueryActionsQueue: RejectedAction<QueryThunk, any>[] = []
19+
export const buildBatchedActionsHandler: InternalHandlerBuilder<
20+
[actionShouldContinue: boolean, subscriptionExists: boolean]
21+
> = ({ api, queryThunk }) => {
22+
const { actuallyMutateSubscriptions } = api.internalActions
23+
const subscriptionsPrefix = `${api.reducerPath}/subscriptions`
24+
25+
let previousSubscriptions: SubscriptionState =
26+
null as unknown as SubscriptionState
27+
2228
let dispatchQueued = false
2329

24-
return (action, mwApi) => {
25-
if (queryThunk.rejected.match(action)) {
26-
const { condition, arg } = action.meta
27-
28-
if (condition && arg.subscribe) {
29-
// request was aborted due to condition (another query already running)
30-
// _Don't_ dispatch right away - queue it for a debounced grouped dispatch
31-
abortedQueryActionsQueue.push(action)
32-
33-
if (!dispatchQueued) {
34-
queueMicrotaskShim(() => {
35-
mwApi.dispatch(
36-
api.internalActions.subscriptionRequestsRejected(
37-
abortedQueryActionsQueue
38-
)
39-
)
40-
abortedQueryActionsQueue = []
41-
dispatchQueued = false
42-
})
43-
dispatchQueued = true
44-
}
45-
// _Don't_ let the action reach the reducers now!
46-
return false
30+
return (action, mwApi, internalState) => {
31+
if (!previousSubscriptions) {
32+
// Initialize it the first time this handler runs
33+
previousSubscriptions = JSON.parse(
34+
JSON.stringify(internalState.currentSubscriptions)
35+
)
36+
}
37+
38+
// Intercept requests by hooks to see if they're subscribed
39+
// Necessary because we delay updating store state to the end of the tick
40+
if (api.internalActions.internal_probeSubscription.match(action)) {
41+
const { queryCacheKey, requestId } = action.payload
42+
const hasSubscription =
43+
!!internalState.currentSubscriptions[queryCacheKey]?.[requestId]
44+
return [false, hasSubscription]
45+
}
46+
47+
// Update subscription data based on this action
48+
const didMutate = actuallyMutateSubscriptions(
49+
internalState.currentSubscriptions,
50+
action
51+
)
52+
53+
if (didMutate) {
54+
if (!dispatchQueued) {
55+
queueMicrotaskShim(() => {
56+
// Deep clone the current subscription data
57+
const newSubscriptions: SubscriptionState = JSON.parse(
58+
JSON.stringify(internalState.currentSubscriptions)
59+
)
60+
// Figure out a smaller diff between original and current
61+
const [, patches] = produceWithPatches(
62+
previousSubscriptions,
63+
() => newSubscriptions
64+
)
65+
66+
// Sync the store state for visibility
67+
mwApi.next(api.internalActions.subscriptionsUpdated(patches))
68+
// Save the cloned state for later reference
69+
previousSubscriptions = newSubscriptions
70+
dispatchQueued = false
71+
})
72+
dispatchQueued = true
4773
}
74+
75+
const isSubscriptionSliceAction =
76+
!!action.type?.startsWith(subscriptionsPrefix)
77+
const isAdditionalSubscriptionAction =
78+
queryThunk.rejected.match(action) &&
79+
action.meta.condition &&
80+
!!action.meta.arg.subscribe
81+
82+
const actionShouldContinue =
83+
!isSubscriptionSliceAction && !isAdditionalSubscriptionAction
84+
85+
return [actionShouldContinue, false]
4886
}
4987

50-
return true
88+
return [true, false]
5189
}
5290
}

packages/toolkit/src/query/core/buildMiddleware/cacheCollection.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type {
77
TimeoutId,
88
InternalHandlerBuilder,
99
ApiMiddlewareInternalHandler,
10+
InternalMiddlewareState,
1011
} from './types'
1112

1213
export type ReferenceCacheCollection = never
@@ -54,16 +55,19 @@ export const buildCacheCollectionHandler: InternalHandlerBuilder = ({
5455

5556
function anySubscriptionsRemainingForKey(
5657
queryCacheKey: string,
57-
api: SubMiddlewareApi
58+
internalState: InternalMiddlewareState
5859
) {
59-
const subscriptions =
60-
api.getState()[reducerPath].subscriptions[queryCacheKey]
60+
const subscriptions = internalState.currentSubscriptions[queryCacheKey]
6161
return !!subscriptions && !isObjectEmpty(subscriptions)
6262
}
6363

6464
const currentRemovalTimeouts: QueryStateMeta<TimeoutId> = {}
6565

66-
const handler: ApiMiddlewareInternalHandler = (action, mwApi) => {
66+
const handler: ApiMiddlewareInternalHandler = (
67+
action,
68+
mwApi,
69+
internalState
70+
) => {
6771
if (unsubscribeQueryResult.match(action)) {
6872
const state = mwApi.getState()[reducerPath]
6973
const { queryCacheKey } = action.payload
@@ -72,6 +76,7 @@ export const buildCacheCollectionHandler: InternalHandlerBuilder = ({
7276
queryCacheKey,
7377
state.queries[queryCacheKey]?.endpointName,
7478
mwApi,
79+
internalState,
7580
state.config
7681
)
7782
}
@@ -94,6 +99,7 @@ export const buildCacheCollectionHandler: InternalHandlerBuilder = ({
9499
queryCacheKey as QueryCacheKey,
95100
queryState?.endpointName,
96101
mwApi,
102+
internalState,
97103
state.config
98104
)
99105
}
@@ -104,6 +110,7 @@ export const buildCacheCollectionHandler: InternalHandlerBuilder = ({
104110
queryCacheKey: QueryCacheKey,
105111
endpointName: string | undefined,
106112
api: SubMiddlewareApi,
113+
internalState: InternalMiddlewareState,
107114
config: ConfigState<string>
108115
) {
109116
const endpointDefinition = context.endpointDefinitions[
@@ -125,13 +132,13 @@ export const buildCacheCollectionHandler: InternalHandlerBuilder = ({
125132
Math.min(keepUnusedDataFor, THIRTY_TWO_BIT_MAX_TIMER_SECONDS)
126133
)
127134

128-
if (!anySubscriptionsRemainingForKey(queryCacheKey, api)) {
135+
if (!anySubscriptionsRemainingForKey(queryCacheKey, internalState)) {
129136
const currentTimeout = currentRemovalTimeouts[queryCacheKey]
130137
if (currentTimeout) {
131138
clearTimeout(currentTimeout)
132139
}
133140
currentRemovalTimeouts[queryCacheKey] = setTimeout(() => {
134-
if (!anySubscriptionsRemainingForKey(queryCacheKey, api)) {
141+
if (!anySubscriptionsRemainingForKey(queryCacheKey, internalState)) {
135142
api.dispatch(removeQueryResult({ queryCacheKey }))
136143
}
137144
delete currentRemovalTimeouts![queryCacheKey]

packages/toolkit/src/query/core/buildMiddleware/cacheLifecycle.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ export const buildCacheLifecycleHandler: InternalHandlerBuilder = ({
197197
const handler: ApiMiddlewareInternalHandler = (
198198
action,
199199
mwApi,
200+
internalState,
200201
stateBefore
201202
) => {
202203
const cacheKey = getCacheKey(action)

packages/toolkit/src/query/core/buildMiddleware/index.ts

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@ import type { QueryThunkArg } from '../buildThunks'
1010
import { buildCacheCollectionHandler } from './cacheCollection'
1111
import { buildInvalidationByTagsHandler } from './invalidationByTags'
1212
import { buildPollingHandler } from './polling'
13-
import type { BuildMiddlewareInput, InternalHandlerBuilder } from './types'
13+
import type {
14+
BuildMiddlewareInput,
15+
InternalHandlerBuilder,
16+
InternalMiddlewareState,
17+
} from './types'
1418
import { buildWindowEventHandler } from './windowEventHandling'
1519
import { buildCacheLifecycleHandler } from './cacheLifecycle'
1620
import { buildQueryLifecycleHandler } from './queryLifecycle'
@@ -69,6 +73,10 @@ export function buildMiddleware<
6973
const batchedActionsHandler = buildBatchedActionsHandler(builderArgs)
7074
const windowEventsHandler = buildWindowEventHandler(builderArgs)
7175

76+
let internalState: InternalMiddlewareState = {
77+
currentSubscriptions: {},
78+
}
79+
7280
return (next) => {
7381
return (action) => {
7482
if (!initialized) {
@@ -77,19 +85,30 @@ export function buildMiddleware<
7785
mwApi.dispatch(api.internalActions.middlewareRegistered(apiUid))
7886
}
7987

88+
const mwApiWithNext = { ...mwApi, next }
89+
8090
const stateBefore = mwApi.getState()
8191

82-
if (!batchedActionsHandler(action, mwApi, stateBefore)) {
83-
return
84-
}
92+
const [actionShouldContinue, hasSubscription] = batchedActionsHandler(
93+
action,
94+
mwApiWithNext,
95+
internalState,
96+
stateBefore
97+
)
98+
99+
let res: any
85100

86-
const res = next(action)
101+
if (actionShouldContinue) {
102+
res = next(action)
103+
} else {
104+
res = hasSubscription
105+
}
87106

88107
if (!!mwApi.getState()[reducerPath]) {
89108
// Only run these checks if the middleware is registered okay
90109

91110
// This looks for actions that aren't specific to the API slice
92-
windowEventsHandler(action, mwApi, stateBefore)
111+
windowEventsHandler(action, mwApiWithNext, internalState, stateBefore)
93112

94113
if (
95114
isThisApiSliceAction(action) ||
@@ -98,7 +117,7 @@ export function buildMiddleware<
98117
// Only run these additional checks if the actions are part of the API slice,
99118
// or the action has hydration-related data
100119
for (let handler of handlers) {
101-
handler(action, mwApi, stateBefore)
120+
handler(action, mwApiWithNext, internalState, stateBefore)
102121
}
103122
}
104123
}

packages/toolkit/src/query/core/buildMiddleware/polling.ts

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import type {
66
TimeoutId,
77
InternalHandlerBuilder,
88
ApiMiddlewareInternalHandler,
9+
InternalMiddlewareState,
910
} from './types'
1011

1112
export const buildPollingHandler: InternalHandlerBuilder = ({
@@ -20,26 +21,30 @@ export const buildPollingHandler: InternalHandlerBuilder = ({
2021
pollingInterval: number
2122
}> = {}
2223

23-
const handler: ApiMiddlewareInternalHandler = (action, mwApi) => {
24+
const handler: ApiMiddlewareInternalHandler = (
25+
action,
26+
mwApi,
27+
internalState
28+
) => {
2429
if (
2530
api.internalActions.updateSubscriptionOptions.match(action) ||
2631
api.internalActions.unsubscribeQueryResult.match(action)
2732
) {
28-
updatePollingInterval(action.payload, mwApi)
33+
updatePollingInterval(action.payload, mwApi, internalState)
2934
}
3035

3136
if (
3237
queryThunk.pending.match(action) ||
3338
(queryThunk.rejected.match(action) && action.meta.condition)
3439
) {
35-
updatePollingInterval(action.meta.arg, mwApi)
40+
updatePollingInterval(action.meta.arg, mwApi, internalState)
3641
}
3742

3843
if (
3944
queryThunk.fulfilled.match(action) ||
4045
(queryThunk.rejected.match(action) && !action.meta.condition)
4146
) {
42-
startNextPoll(action.meta.arg, mwApi)
47+
startNextPoll(action.meta.arg, mwApi, internalState)
4348
}
4449

4550
if (api.util.resetApiState.match(action)) {
@@ -49,11 +54,12 @@ export const buildPollingHandler: InternalHandlerBuilder = ({
4954

5055
function startNextPoll(
5156
{ queryCacheKey }: QuerySubstateIdentifier,
52-
api: SubMiddlewareApi
57+
api: SubMiddlewareApi,
58+
internalState: InternalMiddlewareState
5359
) {
5460
const state = api.getState()[reducerPath]
5561
const querySubState = state.queries[queryCacheKey]
56-
const subscriptions = state.subscriptions[queryCacheKey]
62+
const subscriptions = internalState.currentSubscriptions[queryCacheKey]
5763

5864
if (!querySubState || querySubState.status === QueryStatus.uninitialized)
5965
return
@@ -84,11 +90,12 @@ export const buildPollingHandler: InternalHandlerBuilder = ({
8490

8591
function updatePollingInterval(
8692
{ queryCacheKey }: QuerySubstateIdentifier,
87-
api: SubMiddlewareApi
93+
api: SubMiddlewareApi,
94+
internalState: InternalMiddlewareState
8895
) {
8996
const state = api.getState()[reducerPath]
9097
const querySubState = state.queries[queryCacheKey]
91-
const subscriptions = state.subscriptions[queryCacheKey]
98+
const subscriptions = internalState.currentSubscriptions[queryCacheKey]
9299

93100
if (!querySubState || querySubState.status === QueryStatus.uninitialized) {
94101
return
@@ -105,7 +112,7 @@ export const buildPollingHandler: InternalHandlerBuilder = ({
105112
const nextPollTimestamp = Date.now() + lowestPollingInterval
106113

107114
if (!currentPoll || nextPollTimestamp < currentPoll.nextPollTimestamp) {
108-
startNextPoll({ queryCacheKey }, api)
115+
startNextPoll({ queryCacheKey }, api, internalState)
109116
}
110117
}
111118

@@ -125,13 +132,15 @@ export const buildPollingHandler: InternalHandlerBuilder = ({
125132

126133
function findLowestPollingInterval(subscribers: Subscribers = {}) {
127134
let lowestPollingInterval = Number.POSITIVE_INFINITY
128-
for (const subscription of Object.values(subscribers)) {
129-
if (!!subscription.pollingInterval)
135+
for (let key in subscribers) {
136+
if (!!subscribers[key].pollingInterval) {
130137
lowestPollingInterval = Math.min(
131-
subscription.pollingInterval,
138+
subscribers[key].pollingInterval!,
132139
lowestPollingInterval
133140
)
141+
}
134142
}
143+
135144
return lowestPollingInterval
136145
}
137146
return handler

0 commit comments

Comments
 (0)