Skip to content

Commit 98cc7c6

Browse files
committed
feat: add flatMap
1 parent f86f13f commit 98cc7c6

File tree

2 files changed

+73
-11
lines changed

2 files changed

+73
-11
lines changed

src/index.test.ts

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ import {
1111
skip,
1212
merge,
1313
fromIterable,
14-
interval
14+
interval,
15+
flatMap
1516
} from '.'
1617

1718
describe('Stream Utils', () => {
@@ -38,6 +39,44 @@ describe('Stream Utils', () => {
3839
})
3940
})
4041

42+
describe('flatMap', () => {
43+
it('should transform and flatten chunks', async () => {
44+
const stream = fromIterable(['hello', 'world']).pipeThrough(
45+
flatMap(str => str.split(''))
46+
)
47+
const actual = await toArray(stream)
48+
expect(actual).toStrictEqual([
49+
'h',
50+
'e',
51+
'l',
52+
'l',
53+
'o',
54+
'w',
55+
'o',
56+
'r',
57+
'l',
58+
'd'
59+
])
60+
})
61+
62+
it('should handle async transformations', async () => {
63+
const stream = fromIterable(['a', 'b']).pipeThrough(
64+
flatMap(async str => {
65+
await delay(10)
66+
return [str, str.toUpperCase()]
67+
})
68+
)
69+
const actual = await toArray(stream)
70+
expect(actual).toStrictEqual(['a', 'A', 'b', 'B'])
71+
})
72+
73+
it('should handle empty arrays', async () => {
74+
const stream = fromIterable([1, 2, 3]).pipeThrough(flatMap(() => []))
75+
const actual = await toArray(stream)
76+
expect(actual).toStrictEqual([])
77+
})
78+
})
79+
4180
describe('filter', () => {
4281
it('should filter chunks based on predicate', async () => {
4382
const stream = fromIterable([1, 2, 3, 4, 5]).pipeThrough(

src/index.ts

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ type SyncOrAsync<T> = T | Promise<T>
88
* @returns A TransformStream that applies the transformation
99
* @example
1010
* ```ts
11-
* const stream = sourceStream.pipeThrough(map(x => x * 2));
11+
* const stream = readable.pipeThrough(map(x => x * 2));
1212
* ```
1313
*/
1414
export function map<T, R>(
@@ -21,6 +21,29 @@ export function map<T, R>(
2121
})
2222
}
2323

24+
/**
25+
* Map and flatten stream chunks
26+
*
27+
* @category Transformation
28+
* @param fn - The transformation function that returns an array
29+
* @returns A TransformStream that applies the transformation and flattens the result
30+
* @example
31+
* ```ts
32+
* const stream = readable.pipeThrough(flatMap(word => word.split(''));
33+
* ```
34+
*/
35+
export function flatMap<T, R>(
36+
fn: (chunk: T) => SyncOrAsync<R[]>
37+
): TransformStream<T, R> {
38+
const mapper = map<T, R[]>(fn)
39+
const flattener = flatten<R>()
40+
mapper.readable.pipeThrough(flattener)
41+
return {
42+
readable: flattener.readable,
43+
writable: mapper.writable
44+
}
45+
}
46+
2447
/**
2548
* Filter function for filtering stream chunks
2649
*
@@ -29,7 +52,7 @@ export function map<T, R>(
2952
* @returns A TransformStream that only passes chunks that satisfy the predicate
3053
* @example
3154
* ```ts
32-
* const stream = sourceStream.pipeThrough(filter(x => x > 10));
55+
* const stream = readable.pipeThrough(filter(x => x > 10));
3356
* ```
3457
*/
3558
export function filter<T>(
@@ -52,7 +75,7 @@ export function filter<T>(
5275
* @returns A TransformStream that passes chunks unchanged after executing the function
5376
* @example
5477
* ```ts
55-
* const stream = sourceStream.pipeThrough(tap(x => console.log(`Processing: ${x}`)));
78+
* const stream = readable.pipeThrough(tap(x => console.log(`Processing: ${x}`)));
5679
* ```
5780
*/
5881
export function tap<T>(
@@ -74,7 +97,7 @@ export function tap<T>(
7497
* @returns A promise that resolves to an array of all chunks
7598
* @example
7699
* ```ts
77-
* const result = await toArray(sourceStream);
100+
* const result = await toArray(readable);
78101
* console.log(result); // [1, 2, 3, ...]
79102
* ```
80103
*/
@@ -98,8 +121,8 @@ export async function toArray<T>(stream: ReadableStream<T>): Promise<T[]> {
98121
* @returns A TransformStream that groups chunks into batches
99122
* @example
100123
* ```ts
101-
* const stream = sourceStream.pipeThrough(batch(3));
102-
* // If sourceStream emits [1, 2, 3, 4, 5], the result will be [[1, 2, 3], [4, 5]]
124+
* const stream = readable.pipeThrough(batch(3));
125+
* // If readable emits [1, 2, 3, 4, 5], the result will be [[1, 2, 3], [4, 5]]
103126
* ```
104127
*/
105128
export function batch<T>(size: number): TransformStream<T, T[]> {
@@ -129,8 +152,8 @@ export function batch<T>(size: number): TransformStream<T, T[]> {
129152
* @returns A TransformStream that flattens arrays into individual chunks
130153
* @example
131154
* ```ts
132-
* const stream = sourceStream.pipeThrough(flatten());
133-
* // If sourceStream emits [[1, 2], [3, 4]], the result will be [1, 2, 3, 4]
155+
* const stream = readable.pipeThrough(flatten());
156+
* // If readable emits [[1, 2], [3, 4]], the result will be [1, 2, 3, 4]
134157
* ```
135158
*/
136159
export function flatten<T>(): TransformStream<T[], T> {
@@ -151,7 +174,7 @@ export function flatten<T>(): TransformStream<T[], T> {
151174
* @returns A TransformStream that limits the number of chunks
152175
* @example
153176
* ```ts
154-
* const stream = sourceStream.pipeThrough(take(3));
177+
* const stream = readable.pipeThrough(take(3));
155178
* // Only the first 3 chunks will pass through
156179
* ```
157180
*/
@@ -181,7 +204,7 @@ export function take<T>(limit: number): TransformStream<T, T> {
181204
* @returns A TransformStream that skips the specified number of chunks
182205
* @example
183206
* ```ts
184-
* const stream = sourceStream.pipeThrough(skip(2));
207+
* const stream = readable.pipeThrough(skip(2));
185208
* // The first 2 chunks will be skipped
186209
* ```
187210
*/

0 commit comments

Comments
 (0)