A collection of helper methods for WebStreams, inspired by ReactiveExtensions. Being built on-top of ReadableStream, we can have a reactive pipeline with non-blocking back-pressure built-in.
Requires support for ReadableStream (use a polyfill if not available).
Subjects require support for WritableStream. Requires support for async/await.
npm install web-streams-extensions
import { from, pipe, map, filter, toArray } from 'web-streams-extensions';
// Create a stream from an array
const stream = pipe(
from([1, 2, 3, 4, 5, 6]),
filter(x => x % 2 === 0),
map(x => x * 2)
);
const result = await toArray(stream);
console.log(result); // [4, 8, 12]
from(src: Iterable | AsyncIterable | (() => Iterable | AsyncIterable) | ReadableLike): ReadableStream
Creates a ReadableStream from various input sources. Supports iterables, async iterables, promises, functions returning iterables, and ReadableLike objects.
It will not create an iterator until the resulting stream is read from (lazy evaluation).
from([1, 2, 3, 4])
from(function*() { yield 1; yield 2; yield 3; yield 4; })
from(async function*() { yield 1; yield 2; yield 3; yield await Promise.resolve(4); })
from(Promise.resolve([1, 2, 3, 4]))
Creates a ReadableStream where the chunks will be the in-order arguments passed to it.
of(1, "foo", () => "bar", {})
// Emits: 1, "foo", () => "bar", {}
Concatenates several streams together in the order given. Each stream is read to completion before moving to the next.
It will not read from the streams until the result stream is read from (lazy evaluation).
let inputA = [1, 2];
let inputB = [3, 4];
let expected = [1, 2, 3, 4];
let stream = concat(from(inputA), from(inputB));
let result = await toArray(stream);
Defers the creation of a ReadableStream until it's actually read from. Useful for lazy initialization.
let input = [1, 2, 3, 4];
let expected = [1, 2, 3, 4];
let result = await toArray(defer(() => Promise.resolve(from(input))));
Consumes a ReadableStream and returns all values as an array. The stream will be read to completion.
let input = [1, 2, 3, 4];
let expected = [1, 2, 3, 4];
let result = await toArray(from([1, 2, 3, 4]));
Consumes a ReadableStream and returns the last value emitted. The stream will be read to completion.
let input = [1, 2, 3, 4];
let expected = 4;
let result = await toPromise(from([1, 2, 3, 4]));
Immediately begins to read from src, passing each chunk to the next
callback and awaiting if it returns a promise.
Once the source signals the end of the stream, complete
is called.
If the source stream throws an error, this is passed to the error
callback.
Returns a subscription object with an unsubscribe
method to stop reading.
let src = from(function*() { yield 1; yield 2; yield 3; });
subscribe(src,
(next) => { console.log("Next:", next); },
() => { console.log("Complete"); },
(err) => { console.log("Error:", err); }
);
Given inconsistencies in browser support for anything other than ReadableStream, we opted to make an Operator a function of the form:
type Op<T, R> = (src: ReadableStream<T>) => ReadableStream<R>
This only requires ReadableStream to be implemented/available with getReader support. To aid in pipelining these operators, a pipe
method is available:
Pipes a source stream through a series of operators, creating a transformation pipeline. Each operator transforms the stream in some way.
let input = [1, 2, 3, 4];
let expected = { "1": 1, "2": 2, "4": 4 };
let result = await toPromise(
pipe(
from(input),
filter(x => x != 3),
buffer(Infinity),
map(x => {
return x.reduce((p, c) => { p[c.toString()] = c; return p }, {});
}),
first()
));
Buffers elements and emits them as arrays when the buffer reaches the specified count. The final buffer (if not empty) is emitted when the source stream completes.
let input = [1, 2, 3, 4];
let expected = [[1, 2], [3, 4]];
let stream = pipe(from(input), buffer(2));
let result = await toArray(stream);
Given a ReadableStream of ReadableStreams, concatenates the output of each stream in sequence.
let input = [from([1, 2]), from([3, 4]), from([5])];
let expected = [1, 2, 3, 4, 5];
let stream = pipe(from(input), concatAll());
let result = await toArray(stream);
Filters out chunks that fail a predicate test. Only values that pass the predicate are emitted.
let input = [1, 2, 3, 4];
let expected = [1, 2, 4];
let stream = pipe(from(input), filter(x => x != 3));
let result = await toArray(stream);
returns a stream of one chunk, the first to return true when passed to the selector, or simply the first if no predicate is supplied
let input = [1,2,3,4];
let expected = 3;
let stream = first(x=>x>=3)(from(input));
let result = await toPromise(stream);
returns a stream of one chunk, the last to return true when passed to the selector, or simply the last chunk if no predicate is supplied
let input = [1,2,3,4];
let expected = 3;
let stream = last(x=>x<=3)(from(input));
let result = await toPromise(stream);
Maps each value in a stream through a selector function. Values where the selector returns undefined are filtered out.
let input = [1, 2, 3, 4];
let expected = [2, 4, 6, 8];
let stream = pipe(from(input), map(x => x * 2));
let result = await toArray(stream);
Emits only the first value that matches the selector, then completes. If no selector is provided, emits the first value.
let input = [1, 2, 3, 4];
let expected = [2];
let stream = pipe(from(input), first(x => x % 2 === 0));
let result = await toArray(stream);
Emits only the last value that matches the selector. If no selector is provided, emits the last value. The stream must complete for the last value to be emitted.
let input = [1, 2, 3, 4];
let expected = [3];
let stream = pipe(from(input), last(x => x < 4));
let result = await toArray(stream);
Skips the first count
elements and then streams the rest to the output.
let input = [1, 2, 3, 4, 5];
let expected = [3, 4, 5];
let stream = pipe(from(input), skip(2));
let result = await toArray(stream);
Takes only the first count
elements, then completes the stream.
let input = [1, 2, 3, 4, 5];
let expected = [1, 2];
let stream = pipe(from(input), take(2));
let result = await toArray(stream);
Emits an error if the duration waiting for a chunk exceeds the specified timeout in milliseconds.
let stream = pipe(
from(slowAsyncGenerator()),
timeout(5000) // Error if no value within 5 seconds
);
Buffers elements until a duration of time has passed since the last chunk, then emits the buffer.
let stream = pipe(
from(rapidValueStream),
debounceTime(1000) // Wait 1 second after last value
);
Allows observing each chunk without modifying the stream. The output is exactly the same as the input.
let input = [1, 2, 3, 4];
let expected = [1, 2, 3, 4];
let sideEffects = [];
let stream = pipe(
from(input),
tap(x => sideEffects.push(x))
);
let result = await toArray(stream);
throws an error if the duration between chunks exceeds the duration (milliseconds)
Subjects are duplex streams that act as both readable and writable streams with automatic tee'ing of the readable side. Each access to subject.readable
returns a new ReadableStream that will receive all subsequent values.
A Subject is a special type of stream that allows values to be multicast to many observers. It implements both readable and writable streams with proper backpressure handling.
Key features:
- Each call to
.readable
returns a new ReadableStream - Values can be pushed via
.writable
or directly via.next()
- Automatic resource cleanup and error handling
- Full Web Streams compatibility
Properties:
readonly readable: ReadableStream<T>; // Creates a new readable stream
readonly writable: WritableStream<T>; // Writable side for piping
readonly closed: boolean; // Whether the subject is closed
Methods:
next(value: T): Promise<number>; // Push a value directly
complete(): Promise<void>; // Complete the subject
error(err: any): Promise<void>; // Error the subject
subscribe(subscriber): SubscriptionLike; // Subscribe with callbacks
Example usage:
import { Subject, pipe, map, filter, toArray } from 'web-streams-extensions';
// Create a subject
const subject = new Subject<number>();
// Get multiple readers
const reader1 = toArray(pipe(subject.readable, filter(x => x % 2 === 0)));
const reader2 = toArray(pipe(subject.readable, map(x => x * 2)));
// Push values
await subject.next(1);
await subject.next(2);
await subject.next(3);
await subject.next(4);
await subject.complete();
const evenNumbers = await reader1; // [2, 4]
const doubled = await reader2; // [2, 4, 6, 8]
pipeTo to a subject:
const source = from([1, 2, 3, 4]);
const subject = new Subject<number>();
// Pipe source to subject
source.pipeTo(subject.writable);
// Read from subject
const result = await toArray(subject.readable);
console.log(result); // [1, 2, 3, 4]
pipeThrough` the subject:
let input = [1, 2, 3, 4];
let subject = new Subject<number>();
let result = await toArray(from(input).pipeThrough(subject));
expect(result).to.be.deep.eq(expected); // [1,2,3,4]
A BehaviourSubject is like a Subject but remembers the last emitted value and immediately emits it to new subscribers.
const behaviorSubject = new BehaviourSubject(42);
// New subscribers immediately get the last value
const result = await toArray(pipe(behaviorSubject.readable, take(1)));
console.log(result); // [42]
All operators and functions include proper error handling with resource cleanup:
try {
const result = await toArray(
pipe(
from(mightThrowSource),
map(x => x.riskyOperation()),
timeout(5000)
)
);
} catch (error) {
console.error('Stream error:', error);
// All resources are automatically cleaned up
}
The library properly handles backpressure throughout the pipeline:
// Slow consumer will naturally backpressure the fast producer
const slowStream = pipe(
from(fastProducer()),
map(async x => {
await sleep(100); // Slow async operation
return x;
})
);
This library requires support for:
- ReadableStream (for all functionality)
- WritableStream (for Subjects)
- async/await (ES2017)
For older browsers, use the web-streams-polyfill:
import 'web-streams-polyfill/polyfill';
import { from, pipe, map } from 'web-streams-extensions';