Skip to content

Commit 72206a2

Browse files
authored
Merge pull request #3709 from reduxjs/pr/running_subscription
2 parents b7a8ede + 6da44eb commit 72206a2

22 files changed

+317
-168
lines changed

packages/toolkit/src/query/core/buildInitiate.ts

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import type { BaseQueryError, QueryReturnValue } from '../baseQueryTypes'
2020
import type { QueryResultSelectorResult } from './buildSelectors'
2121
import type { Dispatch } from 'redux'
2222
import { isNotNullish } from '../utils/isNotNullish'
23+
import { countObjectKeys } from '../utils/countObjectKeys'
2324

2425
declare module './module' {
2526
export interface ApiEndpointQuery<
@@ -265,19 +266,18 @@ export function buildInitiate({
265266
function middlewareWarning(dispatch: Dispatch) {
266267
if (process.env.NODE_ENV !== 'production') {
267268
if ((middlewareWarning as any).triggered) return
268-
const registered:
269-
| ReturnType<typeof api.internalActions.internal_probeSubscription>
270-
| boolean = dispatch(
271-
api.internalActions.internal_probeSubscription({
272-
queryCacheKey: 'DOES_NOT_EXIST',
273-
requestId: 'DUMMY_REQUEST_ID',
274-
})
269+
const returnedValue = dispatch(
270+
api.internalActions.internal_getRTKQSubscriptions()
275271
)
276272

277273
;(middlewareWarning as any).triggered = true
278274

279-
// The RTKQ middleware _should_ always return a boolean for `probeSubscription`
280-
if (typeof registered !== 'boolean') {
275+
// The RTKQ middleware should return the internal state object,
276+
// but it should _not_ be the action object.
277+
if (
278+
typeof returnedValue !== 'object' ||
279+
typeof returnedValue?.type === 'string'
280+
) {
281281
// Otherwise, must not have been added
282282
throw new Error(
283283
`Warning: Middleware for RTK-Query API at reducerPath "${api.reducerPath}" has not been added to the store.
@@ -395,7 +395,7 @@ You must add the middleware for RTK-Query to function correctly!`
395395

396396
statePromise.then(() => {
397397
delete running[queryCacheKey]
398-
if (!Object.keys(running).length) {
398+
if (!countObjectKeys(running)) {
399399
runningQueries.delete(dispatch)
400400
}
401401
})
@@ -443,7 +443,7 @@ You must add the middleware for RTK-Query to function correctly!`
443443
running[requestId] = ret
444444
ret.then(() => {
445445
delete running[requestId]
446-
if (!Object.keys(running).length) {
446+
if (!countObjectKeys(running)) {
447447
runningMutations.delete(dispatch)
448448
}
449449
})
@@ -452,7 +452,7 @@ You must add the middleware for RTK-Query to function correctly!`
452452
ret.then(() => {
453453
if (running[fixedCacheKey] === ret) {
454454
delete running[fixedCacheKey]
455-
if (!Object.keys(running).length) {
455+
if (!countObjectKeys(running)) {
456456
runningMutations.delete(dispatch)
457457
}
458458
}

packages/toolkit/src/query/core/buildMiddleware/batchActions.ts

Lines changed: 60 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
1-
import type { InternalHandlerBuilder } from './types'
1+
import type { InternalHandlerBuilder, SubscriptionSelectors } from './types'
22
import type { SubscriptionState } from '../apiState'
33
import { produceWithPatches } from 'immer'
44
import type { Action } from '@reduxjs/toolkit'
5+
import { countObjectKeys } from '../../utils/countObjectKeys'
56

67
export const buildBatchedActionsHandler: InternalHandlerBuilder<
7-
[actionShouldContinue: boolean, subscriptionExists: boolean]
8+
[actionShouldContinue: boolean, returnValue: SubscriptionSelectors | boolean]
89
> = ({ api, queryThunk, internalState }) => {
910
const subscriptionsPrefix = `${api.reducerPath}/subscriptions`
1011

1112
let previousSubscriptions: SubscriptionState =
1213
null as unknown as SubscriptionState
1314

14-
let dispatchQueued = false
15+
let updateSyncTimer: ReturnType<typeof window.setTimeout> | null = null
1516

1617
const { updateSubscriptionOptions, unsubscribeQueryResult } =
1718
api.internalActions
@@ -45,13 +46,23 @@ export const buildBatchedActionsHandler: InternalHandlerBuilder<
4546
const {
4647
meta: { arg, requestId },
4748
} = action
49+
const substate = (mutableState[arg.queryCacheKey] ??= {})
50+
substate[`${requestId}_running`] = {}
4851
if (arg.subscribe) {
49-
const substate = (mutableState[arg.queryCacheKey] ??= {})
5052
substate[requestId] =
5153
arg.subscriptionOptions ?? substate[requestId] ?? {}
52-
53-
return true
5454
}
55+
return true
56+
}
57+
let mutated = false
58+
if (
59+
queryThunk.fulfilled.match(action) ||
60+
queryThunk.rejected.match(action)
61+
) {
62+
const state = mutableState[action.meta.arg.queryCacheKey] || {}
63+
const key = `${action.meta.requestId}_running`
64+
mutated ||= !!state[key]
65+
delete state[key]
5566
}
5667
if (queryThunk.rejected.match(action)) {
5768
const {
@@ -62,17 +73,37 @@ export const buildBatchedActionsHandler: InternalHandlerBuilder<
6273
substate[requestId] =
6374
arg.subscriptionOptions ?? substate[requestId] ?? {}
6475

65-
return true
76+
mutated = true
6677
}
6778
}
6879

69-
return false
80+
return mutated
81+
}
82+
83+
const getSubscriptions = () => internalState.currentSubscriptions
84+
const getSubscriptionCount = (queryCacheKey: string) => {
85+
const subscriptions = getSubscriptions()
86+
const subscriptionsForQueryArg = subscriptions[queryCacheKey] ?? {}
87+
return countObjectKeys(subscriptionsForQueryArg)
88+
}
89+
const isRequestSubscribed = (queryCacheKey: string, requestId: string) => {
90+
const subscriptions = getSubscriptions()
91+
return !!subscriptions?.[queryCacheKey]?.[requestId]
92+
}
93+
94+
const subscriptionSelectors: SubscriptionSelectors = {
95+
getSubscriptions,
96+
getSubscriptionCount,
97+
isRequestSubscribed,
7098
}
7199

72100
return (
73101
action,
74102
mwApi
75-
): [actionShouldContinue: boolean, hasSubscription: boolean] => {
103+
): [
104+
actionShouldContinue: boolean,
105+
result: SubscriptionSelectors | boolean
106+
] => {
76107
if (!previousSubscriptions) {
77108
// Initialize it the first time this handler runs
78109
previousSubscriptions = JSON.parse(
@@ -82,16 +113,16 @@ export const buildBatchedActionsHandler: InternalHandlerBuilder<
82113

83114
if (api.util.resetApiState.match(action)) {
84115
previousSubscriptions = internalState.currentSubscriptions = {}
116+
updateSyncTimer = null
85117
return [true, false]
86118
}
87119

88120
// Intercept requests by hooks to see if they're subscribed
89-
// Necessary because we delay updating store state to the end of the tick
90-
if (api.internalActions.internal_probeSubscription.match(action)) {
91-
const { queryCacheKey, requestId } = action.payload
92-
const hasSubscription =
93-
!!internalState.currentSubscriptions[queryCacheKey]?.[requestId]
94-
return [false, hasSubscription]
121+
// We return the internal state reference so that hooks
122+
// can do their own checks to see if they're still active.
123+
// It's stupid and hacky, but it does cut down on some dispatch calls.
124+
if (api.internalActions.internal_getRTKQSubscriptions.match(action)) {
125+
return [false, subscriptionSelectors]
95126
}
96127

97128
// Update subscription data based on this action
@@ -100,9 +131,16 @@ export const buildBatchedActionsHandler: InternalHandlerBuilder<
100131
action
101132
)
102133

134+
let actionShouldContinue = true
135+
103136
if (didMutate) {
104-
if (!dispatchQueued) {
105-
queueMicrotask(() => {
137+
if (!updateSyncTimer) {
138+
// We only use the subscription state for the Redux DevTools at this point,
139+
// as the real data is kept here in the middleware.
140+
// Given that, we can throttle synchronizing this state significantly to
141+
// save on overall perf.
142+
// In 1.9, it was updated in a microtask, but now we do it at most every 500ms.
143+
updateSyncTimer = setTimeout(() => {
106144
// Deep clone the current subscription data
107145
const newSubscriptions: SubscriptionState = JSON.parse(
108146
JSON.stringify(internalState.currentSubscriptions)
@@ -117,25 +155,23 @@ export const buildBatchedActionsHandler: InternalHandlerBuilder<
117155
mwApi.next(api.internalActions.subscriptionsUpdated(patches))
118156
// Save the cloned state for later reference
119157
previousSubscriptions = newSubscriptions
120-
dispatchQueued = false
121-
})
122-
dispatchQueued = true
158+
updateSyncTimer = null
159+
}, 500)
123160
}
124161

125162
const isSubscriptionSliceAction =
126163
typeof action.type == 'string' &&
127164
!!action.type.startsWith(subscriptionsPrefix)
165+
128166
const isAdditionalSubscriptionAction =
129167
queryThunk.rejected.match(action) &&
130168
action.meta.condition &&
131169
!!action.meta.arg.subscribe
132170

133-
const actionShouldContinue =
171+
actionShouldContinue =
134172
!isSubscriptionSliceAction && !isAdditionalSubscriptionAction
135-
136-
return [actionShouldContinue, false]
137173
}
138174

139-
return [true, false]
175+
return [actionShouldContinue, false]
140176
}
141177
}

packages/toolkit/src/query/core/buildMiddleware/index.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ export function buildMiddleware<
7171
>),
7272
internalState,
7373
refetchQuery,
74+
isThisApiSliceAction,
7475
}
7576

7677
const handlers = handlerBuilders.map((build) => build(builderArgs))
@@ -93,18 +94,15 @@ export function buildMiddleware<
9394

9495
const stateBefore = mwApi.getState()
9596

96-
const [actionShouldContinue, hasSubscription] = batchedActionsHandler(
97-
action,
98-
mwApiWithNext,
99-
stateBefore
100-
)
97+
const [actionShouldContinue, internalProbeResult] =
98+
batchedActionsHandler(action, mwApiWithNext, stateBefore)
10199

102100
let res: any
103101

104102
if (actionShouldContinue) {
105103
res = next(action)
106104
} else {
107-
res = hasSubscription
105+
res = internalProbeResult
108106
}
109107

110108
if (!!mwApi.getState()[reducerPath]) {

packages/toolkit/src/query/core/buildMiddleware/invalidationByTags.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ import type {
99
SubMiddlewareApi,
1010
InternalHandlerBuilder,
1111
ApiMiddlewareInternalHandler,
12+
InternalMiddlewareState,
1213
} from './types'
14+
import { countObjectKeys } from '../../utils/countObjectKeys'
1315

1416
export const buildInvalidationByTagsHandler: InternalHandlerBuilder = ({
1517
reducerPath,
@@ -19,6 +21,7 @@ export const buildInvalidationByTagsHandler: InternalHandlerBuilder = ({
1921
api,
2022
assertTagType,
2123
refetchQuery,
24+
internalState,
2225
}) => {
2326
const { removeQueryResult } = api.internalActions
2427
const isThunkActionWithTags = isAnyOf(
@@ -35,7 +38,8 @@ export const buildInvalidationByTagsHandler: InternalHandlerBuilder = ({
3538
endpointDefinitions,
3639
assertTagType
3740
),
38-
mwApi
41+
mwApi,
42+
internalState
3943
)
4044
}
4145

@@ -49,16 +53,19 @@ export const buildInvalidationByTagsHandler: InternalHandlerBuilder = ({
4953
undefined,
5054
assertTagType
5155
),
52-
mwApi
56+
mwApi,
57+
internalState
5358
)
5459
}
5560
}
5661

5762
function invalidateTags(
5863
tags: readonly FullTagDescription<string>[],
59-
mwApi: SubMiddlewareApi
64+
mwApi: SubMiddlewareApi,
65+
internalState: InternalMiddlewareState
6066
) {
6167
const rootState = mwApi.getState()
68+
6269
const state = rootState[reducerPath]
6370

6471
const toInvalidate = api.util.selectInvalidatedBy(rootState, tags)
@@ -67,10 +74,11 @@ export const buildInvalidationByTagsHandler: InternalHandlerBuilder = ({
6774
const valuesArray = Array.from(toInvalidate.values())
6875
for (const { queryCacheKey } of valuesArray) {
6976
const querySubState = state.queries[queryCacheKey]
70-
const subscriptionSubState = state.subscriptions[queryCacheKey] ?? {}
77+
const subscriptionSubState =
78+
internalState.currentSubscriptions[queryCacheKey] ?? {}
7179

7280
if (querySubState) {
73-
if (Object.keys(subscriptionSubState).length === 0) {
81+
if (countObjectKeys(subscriptionSubState) === 0) {
7482
mwApi.dispatch(
7583
removeQueryResult({
7684
queryCacheKey: queryCacheKey as QueryCacheKey,

packages/toolkit/src/query/core/buildMiddleware/types.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ export interface InternalMiddlewareState {
3232
currentSubscriptions: SubscriptionState
3333
}
3434

35+
export interface SubscriptionSelectors {
36+
getSubscriptions: () => SubscriptionState
37+
getSubscriptionCount: (queryCacheKey: string) => number
38+
isRequestSubscribed: (queryCacheKey: string, requestId: string) => boolean
39+
}
40+
3541
export interface BuildMiddlewareInput<
3642
Definitions extends EndpointDefinitions,
3743
ReducerPath extends string,
@@ -61,6 +67,7 @@ export interface BuildSubMiddlewareInput
6167
queryCacheKey: string,
6268
override?: Partial<QueryThunkArg>
6369
): AsyncThunkAction<ThunkResult, QueryThunkArg, {}>
70+
isThisApiSliceAction: (action: Action) => boolean
6471
}
6572

6673
export type SubMiddlewareBuilder = (

packages/toolkit/src/query/core/buildMiddleware/windowEventHandling.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import type {
66
InternalHandlerBuilder,
77
SubMiddlewareApi,
88
} from './types'
9+
import { countObjectKeys } from '../../utils/countObjectKeys'
910

1011
export const buildWindowEventHandler: InternalHandlerBuilder = ({
1112
reducerPath,
@@ -50,7 +51,7 @@ export const buildWindowEventHandler: InternalHandlerBuilder = ({
5051
state.config[type])
5152

5253
if (shouldRefetch) {
53-
if (Object.keys(subscriptionSubState).length === 0) {
54+
if (countObjectKeys(subscriptionSubState) === 0) {
5455
api.dispatch(
5556
removeQueryResult({
5657
queryCacheKey: queryCacheKey as QueryCacheKey,

packages/toolkit/src/query/core/buildSlice.ts

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { PayloadAction, UnknownAction } from '@reduxjs/toolkit'
1+
import type { Action, PayloadAction, UnknownAction } from '@reduxjs/toolkit'
22
import {
33
combineReducers,
44
createAction,
@@ -147,12 +147,9 @@ export function buildSlice({
147147
builder
148148
.addCase(queryThunk.pending, (draft, { meta, meta: { arg } }) => {
149149
const upserting = isUpsertQuery(arg)
150-
if (arg.subscribe || upserting) {
151-
// only initialize substate if we want to subscribe to it
152-
draft[arg.queryCacheKey] ??= {
153-
status: QueryStatus.uninitialized,
154-
endpointName: arg.endpointName,
155-
}
150+
draft[arg.queryCacheKey] ??= {
151+
status: QueryStatus.uninitialized,
152+
endpointName: arg.endpointName,
156153
}
157154

158155
updateQuerySubstateIfExists(draft, arg.queryCacheKey, (substate) => {
@@ -446,12 +443,7 @@ export function buildSlice({
446443
) {
447444
// Dummy
448445
},
449-
internal_probeSubscription(
450-
d,
451-
a: PayloadAction<{ queryCacheKey: string; requestId: string }>
452-
) {
453-
// dummy
454-
},
446+
internal_getRTKQSubscriptions() {},
455447
},
456448
})
457449

0 commit comments

Comments
 (0)