Skip to content

Commit bcd02af

Browse files
committed
fix: add initial reducer value; fix its types
1 parent 6150384 commit bcd02af

File tree

2 files changed

+16
-18
lines changed

2 files changed

+16
-18
lines changed

src/index.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ describe('Stream Utils', () => {
317317
})
318318

319319
afterEach(() => {
320-
vi.restoreAllMocks()
320+
vi.useRealTimers()
321321
})
322322

323323
it('should emit values at specified intervals', async () => {
@@ -366,7 +366,7 @@ describe('Stream Utils', () => {
366366
describe('reduce', () => {
367367
it('should reduce stream to a single value', async () => {
368368
const stream = fromIterable([1, 2, 3, 4, 5]).pipeThrough(
369-
reduce((a, b) => a + b)
369+
reduce((a, b) => a + b, 0)
370370
)
371371
const [sum] = await toArray(stream)
372372
expect(sum).toBe(15)
@@ -377,22 +377,22 @@ describe('Stream Utils', () => {
377377
reduce(async (a, b) => {
378378
await delay(10)
379379
return a + b
380-
})
380+
}, 0)
381381
)
382382
const [sum] = await toArray(stream)
383383
expect(sum).toBe(6)
384384
})
385385

386386
it('should handle empty streams', async () => {
387387
const stream = fromIterable<number>([]).pipeThrough(
388-
reduce((a, b) => a + b)
388+
reduce((a, b) => a + b, 0)
389389
)
390390
const result = await toArray(stream)
391391
expect(result).toStrictEqual([])
392392
})
393393

394394
it('should handle single value streams', async () => {
395-
const stream = fromIterable([42]).pipeThrough(reduce((a, b) => a + b))
395+
const stream = fromIterable([42]).pipeThrough(reduce((a, b) => a + b, 0))
396396
const [result] = await toArray(stream)
397397
expect(result).toBe(42)
398398
})

src/index.ts

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,7 @@ export function interval(period: number): ReadableStream<number> {
351351
*
352352
* @category Transformation
353353
* @param reducer - The reducer function to apply to each chunk
354+
* @param initialValue - The initial value
354355
* @returns A TransformStream that emits the final reduced value when the stream closes
355356
* @example
356357
* ```ts
@@ -359,20 +360,17 @@ export function interval(period: number): ReadableStream<number> {
359360
* const [sum] = await toArray(stream); // Gets the single reduced value
360361
* ```
361362
*/
362-
export function reduce<T>(
363-
reducer: (accumulator: T, chunk: T) => SyncOrAsync<T>
364-
): TransformStream<T, T> {
365-
let accumulator: T | undefined
363+
export function reduce<T, R>(
364+
reducer: (accumulator: R, chunk: T) => SyncOrAsync<R>,
365+
initialValue: R
366+
): TransformStream<T, R> {
367+
let accumulator: R | undefined
366368

367-
return new TransformStream<T, T>({
368-
async transform(chunk: T, controller: TransformStreamDefaultController<T>) {
369-
if (accumulator === undefined) {
370-
accumulator = chunk
371-
} else {
372-
accumulator = await reducer(accumulator, chunk)
373-
}
369+
return new TransformStream<T, R>({
370+
async transform(chunk) {
371+
accumulator = await reducer(accumulator ?? initialValue, chunk)
374372
},
375-
flush(controller: TransformStreamDefaultController<T>) {
373+
flush(controller) {
376374
if (accumulator !== undefined) {
377375
controller.enqueue(accumulator)
378376
}
@@ -400,7 +398,7 @@ export function scan<T, R>(
400398
): TransformStream<T, R> {
401399
let accumulator = initialValue
402400
return new TransformStream<T, R>({
403-
async transform(chunk: T, controller: TransformStreamDefaultController<R>) {
401+
async transform(chunk, controller) {
404402
accumulator = await scanner(accumulator, chunk)
405403
controller.enqueue(accumulator)
406404
}

0 commit comments

Comments
 (0)