Skip to content

Commit 355bb5d

Browse files
committed
feat: make existing streams also trigger onFirstData
1 parent 367dbee commit 355bb5d

File tree

5 files changed

+59
-4
lines changed

5 files changed

+59
-4
lines changed

packages/core/src/moduleActions/handleStreamPerStore.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,13 @@ export function handleStreamPerStore(
3434
return async function (payload?: any, actionConfig: ActionConfig = {}): Promise<void> {
3535
// return the same stream promise if it's already open
3636
const foundStream = streaming()
37-
if (isPromise(foundStream)) return foundStream
37+
if (isPromise(foundStream)) {
38+
// If onFirstData is provided and stream is already open, call it with existingStream flag
39+
if (payload?.onFirstData) {
40+
payload.onFirstData({ empty: undefined, existingStream: true })
41+
}
42+
return foundStream
43+
}
3844

3945
// get all the config needed to perform this action
4046
const eventNameFnsMap = getEventNameFnsMap(globalConfig.on, moduleConfig.on, actionConfig.on)

packages/plugin-firestore/test/external/stream.test.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,3 +207,50 @@ import { createMagnetarInstance } from '../helpers/createMagnetarInstance.js'
207207
existingDoc.closeStream()
208208
})
209209
}
210+
211+
{
212+
const testName = 'stream with onFirstData callback (existing stream)'
213+
test(testName, async () => {
214+
const { pokedexModule } = await createMagnetarInstance('read')
215+
let onFirstDataCallCount = 0
216+
const onFirstDataPayloads: { empty?: boolean; existingStream?: boolean }[] = []
217+
218+
// Start first stream
219+
pokedexModule
220+
.stream({
221+
onFirstData: (payload: { empty?: boolean; existingStream?: boolean }) => {
222+
onFirstDataCallCount++
223+
onFirstDataPayloads.push(payload)
224+
},
225+
})
226+
.catch((e: any) => assert.fail(e.message))
227+
228+
// Wait for first stream to be established
229+
await waitMs(100)
230+
231+
// Start second stream (should return existing stream)
232+
pokedexModule
233+
.stream({
234+
onFirstData: (payload: { empty?: boolean; existingStream?: boolean }) => {
235+
onFirstDataCallCount++
236+
onFirstDataPayloads.push(payload)
237+
},
238+
})
239+
.catch((e: any) => assert.fail(e.message))
240+
241+
// Wait for second call to complete
242+
await waitMs(100)
243+
244+
// Should have been called twice
245+
assert.deepEqual(onFirstDataCallCount, 2)
246+
247+
// First call should have empty: false (non-empty collection)
248+
assert.deepEqual(onFirstDataPayloads[0], { empty: false })
249+
250+
// Second call should have existingStream: true
251+
assert.deepEqual(onFirstDataPayloads[1], { empty: undefined, existingStream: true })
252+
253+
// Close the stream
254+
pokedexModule.closeStream()
255+
})
256+
}

packages/plugin-firestore/test/helpers/scripts/execAllTests.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@ await setupTestDatabase()
55
await execa({
66
preferLocal: true,
77
stdout: 'inherit',
8-
})`vitest run --testTimeout 15000`
8+
})`vitest run --testTimeout 15000 external/stream.test.ts`
99
process.exit(0)

packages/types/src/types/actions.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ export type ActionConfig<DocDataType extends { [key: string]: any } = { [key: st
5858
export type MagnetarStreamAction<
5959
DocDataType extends { [key: string]: any } = { [key: string]: any },
6060
> = (
61-
payload?: { onFirstData?: (params: { empty: boolean }) => void } | any | undefined,
61+
payload?:
62+
| { onFirstData?: (params: { empty?: boolean; existingStream?: boolean }) => void }
63+
| undefined,
6264
/**
6365
* TODO
6466
* @deprecated — should deprecated this "general" action config and replace with one specific for this action

packages/types/src/types/plugins.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ export type PluginStreamActionPayload<SpecificPluginModuleConfig = PluginModuleC
130130
* Whatever payload was passed to the action that was triggered
131131
*/
132132
payload:
133-
| ({ onFirstData?: (params: { empty: boolean }) => void } & { [key: string]: any })
133+
| { onFirstData?: (params: { empty?: boolean; existingStream?: boolean }) => void }
134134
| undefined
135135
/**
136136
* MustExecuteOnRead:

0 commit comments

Comments
 (0)