Skip to content

Commit 9973e0f

Browse files
authored
feat: maxChunks (#9184)
1 parent 719db35 commit 9973e0f

File tree

3 files changed

+143
-5
lines changed

3 files changed

+143
-5
lines changed

docs/reference/streamedQuery.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,10 @@ const query = queryOptions({
3232
- Defaults to `'reset'`
3333
- When set to `'reset'`, the query will erase all data and go back into `pending` state.
3434
- When set to `'append'`, data will be appended to existing data.
35-
- When set to `'replace'`, data will be written to the cache at the end of the stream.
35+
- When set to `'replace'`, all data will be written to the cache once the stream ends.
36+
- `maxChunks?: number`
37+
- Optional
38+
- The maximum number of chunks to keep in the cache.
39+
- Defaults to `undefined`, meaning all chunks will be kept.
40+
- If `undefined` or `0`, the number of chunks is unlimited.
41+
- If the number of chunks exceeds this number, the oldest chunk will be removed.

packages/query-core/src/__tests__/streamedQuery.test.tsx

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,4 +349,129 @@ describe('streamedQuery', () => {
349349

350350
unsubscribe()
351351
})
352+
353+
test('should support maxChunks', async () => {
354+
const key = queryKey()
355+
const observer = new QueryObserver(queryClient, {
356+
queryKey: key,
357+
queryFn: streamedQuery({
358+
queryFn: () => createAsyncNumberGenerator(3),
359+
maxChunks: 2,
360+
}),
361+
})
362+
363+
const unsubscribe = observer.subscribe(vi.fn())
364+
365+
expect(observer.getCurrentResult()).toMatchObject({
366+
status: 'pending',
367+
fetchStatus: 'fetching',
368+
data: undefined,
369+
})
370+
371+
await vi.advanceTimersByTimeAsync(50)
372+
373+
expect(observer.getCurrentResult()).toMatchObject({
374+
status: 'success',
375+
fetchStatus: 'fetching',
376+
data: [0],
377+
})
378+
379+
await vi.advanceTimersByTimeAsync(50)
380+
381+
expect(observer.getCurrentResult()).toMatchObject({
382+
status: 'success',
383+
fetchStatus: 'fetching',
384+
data: [0, 1],
385+
})
386+
387+
await vi.advanceTimersByTimeAsync(50)
388+
389+
expect(observer.getCurrentResult()).toMatchObject({
390+
status: 'success',
391+
fetchStatus: 'idle',
392+
data: [1, 2],
393+
})
394+
395+
unsubscribe()
396+
})
397+
398+
test('maxChunks with append refetch', async () => {
399+
const key = queryKey()
400+
const observer = new QueryObserver(queryClient, {
401+
queryKey: key,
402+
queryFn: streamedQuery({
403+
queryFn: () => createAsyncNumberGenerator(3),
404+
maxChunks: 2,
405+
refetchMode: 'append',
406+
}),
407+
})
408+
409+
const unsubscribe = observer.subscribe(vi.fn())
410+
411+
expect(observer.getCurrentResult()).toMatchObject({
412+
status: 'pending',
413+
fetchStatus: 'fetching',
414+
data: undefined,
415+
})
416+
417+
await vi.advanceTimersByTimeAsync(50)
418+
419+
expect(observer.getCurrentResult()).toMatchObject({
420+
status: 'success',
421+
fetchStatus: 'fetching',
422+
data: [0],
423+
})
424+
425+
await vi.advanceTimersByTimeAsync(50)
426+
427+
expect(observer.getCurrentResult()).toMatchObject({
428+
status: 'success',
429+
fetchStatus: 'fetching',
430+
data: [0, 1],
431+
})
432+
433+
await vi.advanceTimersByTimeAsync(50)
434+
435+
expect(observer.getCurrentResult()).toMatchObject({
436+
status: 'success',
437+
fetchStatus: 'idle',
438+
data: [1, 2],
439+
})
440+
441+
void observer.refetch()
442+
443+
await vi.advanceTimersByTimeAsync(10)
444+
445+
expect(observer.getCurrentResult()).toMatchObject({
446+
status: 'success',
447+
fetchStatus: 'fetching',
448+
data: [1, 2],
449+
})
450+
451+
await vi.advanceTimersByTimeAsync(40)
452+
453+
expect(observer.getCurrentResult()).toMatchObject({
454+
status: 'success',
455+
fetchStatus: 'fetching',
456+
data: [2, 0],
457+
})
458+
459+
await vi.advanceTimersByTimeAsync(50)
460+
461+
expect(observer.getCurrentResult()).toMatchObject({
462+
status: 'success',
463+
fetchStatus: 'fetching',
464+
data: [0, 1],
465+
})
466+
467+
await vi.advanceTimersByTimeAsync(50)
468+
469+
expect(observer.getCurrentResult()).toMatchObject({
470+
status: 'success',
471+
fetchStatus: 'idle',
472+
data: [1, 2],
473+
})
474+
475+
unsubscribe()
476+
})
352477
})

packages/query-core/src/streamedQuery.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { addToEnd } from './utils'
12
import type { QueryFunction, QueryFunctionContext, QueryKey } from './types'
23

34
/**
@@ -9,19 +10,25 @@ import type { QueryFunction, QueryFunctionContext, QueryKey } from './types'
910
* @param refetchMode - Defines how re-fetches are handled.
1011
* Defaults to `'reset'`, erases all data and puts the query back into `pending` state.
1112
* Set to `'append'` to append new data to the existing data.
12-
* Set to `'replace'` to write the data to the cache at the end of the stream.
13+
* Set to `'replace'` to write all data to the cache once the stream ends.
14+
* @param maxChunks - The maximum number of chunks to keep in the cache.
15+
* Defaults to `undefined`, meaning all chunks will be kept.
16+
* If `undefined` or `0`, the number of chunks is unlimited.
17+
* If the number of chunks exceeds this number, the oldest chunk will be removed.
1318
*/
1419
export function streamedQuery<
1520
TQueryFnData = unknown,
1621
TQueryKey extends QueryKey = QueryKey,
1722
>({
1823
queryFn,
1924
refetchMode = 'reset',
25+
maxChunks,
2026
}: {
2127
queryFn: (
2228
context: QueryFunctionContext<TQueryKey>,
2329
) => AsyncIterable<TQueryFnData> | Promise<AsyncIterable<TQueryFnData>>
2430
refetchMode?: 'append' | 'reset' | 'replace'
31+
maxChunks?: number
2532
}): QueryFunction<Array<TQueryFnData>, TQueryKey> {
2633
return async (context) => {
2734
const query = context.client
@@ -38,7 +45,7 @@ export function streamedQuery<
3845
})
3946
}
4047

41-
const result: Array<TQueryFnData> = []
48+
let result: Array<TQueryFnData> = []
4249
const stream = await queryFn(context)
4350

4451
for await (const chunk of stream) {
@@ -51,11 +58,11 @@ export function streamedQuery<
5158
context.client.setQueryData<Array<TQueryFnData>>(
5259
context.queryKey,
5360
(prev = []) => {
54-
return prev.concat([chunk])
61+
return addToEnd(prev, chunk, maxChunks)
5562
},
5663
)
5764
}
58-
result.push(chunk)
65+
result = addToEnd(result, chunk, maxChunks)
5966
}
6067

6168
// finalize result: replace-refetching needs to write to the cache

0 commit comments

Comments
 (0)