From cfbd5ce356620ad7fe25b508e13b9df10313ec3e Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 19 Dec 2023 21:05:51 -0600 Subject: [PATCH] WIP --- nx.json | 2 +- package.json | 2 +- packages/observable/package.json | 135 ++ packages/observable/src/index.ts | 11 + .../observable/src/internal/Observable.ts | 1308 ++++++++++++++++ .../src/internal/getObservableInputType.ts | 48 + packages/observable/src/internal/types.ts | 157 ++ packages/observable/src/internal/utils.ts | 103 ++ packages/observable/src/tsconfig.base.json | 13 + packages/observable/src/tsconfig.cjs.json | 9 + .../observable/src/tsconfig.cjs.spec.json | 10 + packages/observable/src/tsconfig.esm.json | 8 + packages/observable/src/tsconfig.types.json | 13 + .../observable/src/tsconfig.types.spec.json | 7 + packages/observable/tsconfig.json | 28 + packages/rxjs/src/index.ts | 5 +- packages/rxjs/src/internal/AsyncSubject.ts | 2 +- packages/rxjs/src/internal/BehaviorSubject.ts | 2 +- packages/rxjs/src/internal/Observable.ts | 1381 +---------------- packages/rxjs/src/internal/Operator.ts | 2 +- packages/rxjs/src/internal/ReplaySubject.ts | 2 +- packages/rxjs/src/internal/Scheduler.ts | 2 +- packages/rxjs/src/internal/Subject.ts | 4 +- packages/rxjs/src/internal/ajax/ajax.ts | 2 +- packages/rxjs/src/internal/firstValueFrom.ts | 4 +- packages/rxjs/src/internal/lastValueFrom.ts | 2 +- .../src/internal/observable/bindCallback.ts | 2 +- .../observable/bindCallbackInternals.ts | 2 +- .../internal/observable/bindNodeCallback.ts | 2 +- .../src/internal/observable/combineLatest.ts | 4 +- .../rxjs/src/internal/observable/concat.ts | 4 +- .../src/internal/observable/connectable.ts | 4 +- .../rxjs/src/internal/observable/defer.ts | 2 +- .../observable/dom/WebSocketSubject.ts | 4 +- .../observable/dom/animationFrames.ts | 2 +- .../rxjs/src/internal/observable/dom/fetch.ts | 2 +- .../rxjs/src/internal/observable/empty.ts | 2 +- .../rxjs/src/internal/observable/forkJoin.ts | 2 +- .../rxjs/src/internal/observable/fromEvent.ts | 5 +- .../internal/observable/fromEventPattern.ts | 3 +- .../internal/observable/fromSubscribable.ts | 4 +- .../rxjs/src/internal/observable/generate.ts | 2 +- packages/rxjs/src/internal/observable/iif.ts | 2 +- .../rxjs/src/internal/observable/interval.ts | 2 +- .../rxjs/src/internal/observable/merge.ts | 4 +- .../rxjs/src/internal/observable/never.ts | 2 +- packages/rxjs/src/internal/observable/of.ts | 74 - .../internal/observable/onErrorResumeNext.ts | 2 +- .../rxjs/src/internal/observable/partition.ts | 4 +- packages/rxjs/src/internal/observable/race.ts | 4 +- .../rxjs/src/internal/observable/range.ts | 2 +- .../src/internal/observable/throwError.ts | 2 +- .../rxjs/src/internal/observable/timer.ts | 2 +- .../rxjs/src/internal/observable/using.ts | 2 +- packages/rxjs/src/internal/observable/zip.ts | 2 +- packages/rxjs/src/internal/operators/audit.ts | 4 +- .../rxjs/src/internal/operators/buffer.ts | 2 +- .../src/internal/operators/bufferCount.ts | 2 +- .../rxjs/src/internal/operators/bufferTime.ts | 2 +- .../src/internal/operators/bufferToggle.ts | 2 +- .../rxjs/src/internal/operators/bufferWhen.ts | 4 +- .../rxjs/src/internal/operators/catchError.ts | 4 +- .../internal/operators/combineLatestWith.ts | 2 +- .../rxjs/src/internal/operators/concatWith.ts | 2 +- .../rxjs/src/internal/operators/connect.ts | 2 +- .../rxjs/src/internal/operators/debounce.ts | 4 +- .../src/internal/operators/debounceTime.ts | 4 +- .../src/internal/operators/defaultIfEmpty.ts | 2 +- .../rxjs/src/internal/operators/delayWhen.ts | 2 +- .../src/internal/operators/dematerialize.ts | 2 +- .../rxjs/src/internal/operators/distinct.ts | 2 +- .../operators/distinctUntilChanged.ts | 2 +- .../rxjs/src/internal/operators/elementAt.ts | 2 +- .../rxjs/src/internal/operators/endWith.ts | 3 +- packages/rxjs/src/internal/operators/every.ts | 2 +- .../rxjs/src/internal/operators/exhaustMap.ts | 4 +- .../rxjs/src/internal/operators/expand.ts | 2 +- .../rxjs/src/internal/operators/filter.ts | 2 +- .../rxjs/src/internal/operators/finalize.ts | 2 +- packages/rxjs/src/internal/operators/find.ts | 4 +- .../rxjs/src/internal/operators/findIndex.ts | 2 +- packages/rxjs/src/internal/operators/first.ts | 2 +- .../rxjs/src/internal/operators/groupBy.ts | 2 +- .../src/internal/operators/ignoreElements.ts | 2 +- .../rxjs/src/internal/operators/isEmpty.ts | 2 +- .../internal/operators/joinAllInternals.ts | 2 +- packages/rxjs/src/internal/operators/last.ts | 2 +- packages/rxjs/src/internal/operators/map.ts | 2 +- .../src/internal/operators/materialize.ts | 5 +- packages/rxjs/src/internal/operators/max.ts | 2 +- .../src/internal/operators/mergeInternals.ts | 4 +- .../rxjs/src/internal/operators/mergeMap.ts | 2 +- .../rxjs/src/internal/operators/mergeScan.ts | 2 +- .../rxjs/src/internal/operators/mergeWith.ts | 2 +- packages/rxjs/src/internal/operators/min.ts | 2 +- .../rxjs/src/internal/operators/observeOn.ts | 2 +- .../rxjs/src/internal/operators/pairwise.ts | 2 +- .../rxjs/src/internal/operators/partition.ts | 2 +- .../rxjs/src/internal/operators/raceWith.ts | 2 +- .../rxjs/src/internal/operators/reduce.ts | 2 +- .../rxjs/src/internal/operators/repeat.ts | 4 +- .../rxjs/src/internal/operators/repeatWhen.ts | 4 +- packages/rxjs/src/internal/operators/retry.ts | 4 +- .../rxjs/src/internal/operators/retryWhen.ts | 4 +- .../rxjs/src/internal/operators/sample.ts | 2 +- packages/rxjs/src/internal/operators/scan.ts | 2 +- .../src/internal/operators/scanInternals.ts | 4 +- .../src/internal/operators/sequenceEqual.ts | 2 +- packages/rxjs/src/internal/operators/share.ts | 4 +- .../rxjs/src/internal/operators/single.ts | 2 +- .../rxjs/src/internal/operators/skipLast.ts | 2 +- .../rxjs/src/internal/operators/skipUntil.ts | 2 +- .../rxjs/src/internal/operators/skipWhile.ts | 2 +- .../rxjs/src/internal/operators/startWith.ts | 2 +- .../src/internal/operators/subscribeOn.ts | 2 +- .../rxjs/src/internal/operators/switchMap.ts | 4 +- .../rxjs/src/internal/operators/switchScan.ts | 2 +- packages/rxjs/src/internal/operators/take.ts | 2 +- .../rxjs/src/internal/operators/takeLast.ts | 2 +- .../rxjs/src/internal/operators/takeUntil.ts | 2 +- .../rxjs/src/internal/operators/takeWhile.ts | 2 +- packages/rxjs/src/internal/operators/tap.ts | 3 +- .../rxjs/src/internal/operators/throttle.ts | 4 +- .../src/internal/operators/throwIfEmpty.ts | 2 +- .../src/internal/operators/timeInterval.ts | 2 +- .../rxjs/src/internal/operators/timeout.ts | 4 +- .../rxjs/src/internal/operators/toArray.ts | 2 +- .../rxjs/src/internal/operators/window.ts | 2 +- .../src/internal/operators/windowCount.ts | 2 +- .../rxjs/src/internal/operators/windowTime.ts | 2 +- .../src/internal/operators/windowToggle.ts | 2 +- .../rxjs/src/internal/operators/windowWhen.ts | 4 +- .../src/internal/operators/withLatestFrom.ts | 2 +- .../rxjs/src/internal/operators/zipWith.ts | 2 +- .../src/internal/scheduled/scheduleArray.ts | 2 +- .../scheduled/scheduleAsyncIterable.ts | 2 +- .../internal/scheduled/scheduleIterable.ts | 3 +- .../internal/scheduled/scheduleObservable.ts | 2 +- .../src/internal/scheduled/schedulePromise.ts | 2 +- .../scheduled/scheduleReadableStreamLike.ts | 4 +- .../rxjs/src/internal/scheduled/scheduled.ts | 4 +- .../rxjs/src/internal/scheduler/Action.ts | 2 +- .../src/internal/scheduler/AsyncAction.ts | 2 +- .../src/internal/scheduler/QueueAction.ts | 2 +- .../scheduler/VirtualTimeScheduler.ts | 2 +- .../scheduler/animationFrameProvider.ts | 2 +- .../src/internal/testing/ColdObservable.ts | 4 +- .../src/internal/testing/HotObservable.ts | 4 +- .../src/internal/testing/TestScheduler.ts | 4 +- packages/rxjs/src/internal/types.ts | 2 +- packages/rxjs/src/internal/util/args.ts | 2 +- .../rxjs/src/internal/util/executeSchedule.ts | 2 +- .../rxjs/src/internal/util/isScheduler.ts | 2 +- packages/rxjs/src/internal/util/rx.ts | 4 +- 154 files changed, 2033 insertions(+), 1631 deletions(-) create mode 100644 packages/observable/package.json create mode 100644 packages/observable/src/index.ts create mode 100644 packages/observable/src/internal/Observable.ts create mode 100644 packages/observable/src/internal/getObservableInputType.ts create mode 100644 packages/observable/src/internal/types.ts create mode 100644 packages/observable/src/internal/utils.ts create mode 100644 packages/observable/src/tsconfig.base.json create mode 100644 packages/observable/src/tsconfig.cjs.json create mode 100644 packages/observable/src/tsconfig.cjs.spec.json create mode 100644 packages/observable/src/tsconfig.esm.json create mode 100644 packages/observable/src/tsconfig.types.json create mode 100644 packages/observable/src/tsconfig.types.spec.json create mode 100644 packages/observable/tsconfig.json delete mode 100644 packages/rxjs/src/internal/observable/of.ts diff --git a/nx.json b/nx.json index 8b5310d32a..71008a12f8 100644 --- a/nx.json +++ b/nx.json @@ -27,7 +27,7 @@ }, "groups": { "npm": { - "projects": ["rxjs"], + "projects": ["rxjs", "@rxjs/observable"], "version": { "generatorOptions": { "currentVersionResolver": "git-tag", diff --git a/package.json b/package.json index 97abf988fe..8a91a570f5 100644 --- a/package.json +++ b/package.json @@ -3,7 +3,7 @@ "private": true, "workspaces": { "packages": [ - "packages/rxjs", + "packages/*", "apps/rxjs.dev" ], "nohoist": [ diff --git a/packages/observable/package.json b/packages/observable/package.json new file mode 100644 index 0000000000..177d76dbf4 --- /dev/null +++ b/packages/observable/package.json @@ -0,0 +1,135 @@ +{ + "name": "@rxjs/observable", + "version": "8.0.0-alpha.13", + "description": "Reactive Extensions for modern JavaScript", + "main": "./dist/cjs/index.js", + "module": "./dist/esm/index.js", + "types": "index.d.ts", + "typesVersions": { + ">=4.2": { + "*": [ + "dist/types/*" + ] + } + }, + "sideEffects": false, + "exports": { + ".": { + "types": "./dist/types/index.d.ts", + "node": "./dist/cjs/index.js", + "require": "./dist/cjs/index.js", + "default": "./dist/esm/index.js" + }, + "./internal/*": { + "types": "./dist/types/internal/*.d.ts", + "node": "./dist/cjs/internal/*.js", + "require": "./dist/cjs/internal/*.js", + "default": "./dist/esm/internal/*.js" + }, + "./package.json": "./package.json" + }, + "config": { + "commitizen": { + "path": "cz-conventional-changelog" + } + }, + "lint-staged": { + "*.js": "eslint --cache --fix", + "(src|spec)/**/*.ts": [ + "eslint --fix", + "prettier --write" + ], + "*.{js,css,md}": "prettier --write" + }, + "scripts": { + "changelog": "npx conventional-changelog-cli -p angular -i CHANGELOG.md -s", + "lint": "eslint --ext=ts,js src spec spec-dtslint", + "dtslint": "npm run lint && tsc -b ./src/tsconfig.types.json", + "test": "yarn build && cross-env TS_NODE_PROJECT=tsconfig.mocha.json mocha --config ../rxjs/src/support/.mocharc.js \"src/**/*-spec.ts\"", + "compile": "tsc -b ./src/tsconfig.cjs.json ./src/tsconfig.cjs.spec.json ./src/tsconfig.esm.json ./src/tsconfig.types.json ./src/tsconfig.types.spec.json", + "build:clean": "shx rm -rf ./dist", + "build": "yarn build:clean && yarn compile", + "copy_common_package_files": "node ../../scripts/copy-common-package-files.js" + }, + "repository": { + "type": "git", + "url": "https://github.com/reactivex/rxjs.git", + "directory": "packages/rxjs" + }, + "keywords": [ + "Rx", + "RxJS", + "ReactiveX", + "ReactiveExtensions", + "Streams", + "Observables", + "Observable", + "Stream" + ], + "author": "Ben Lesh ", + "license": "Apache-2.0", + "bugs": { + "url": "https://github.com/ReactiveX/RxJS/issues" + }, + "homepage": "https://rxjs.dev", + "devDependencies": { + "@swc/core": "^1.2.128", + "@swc/helpers": "^0.3.2", + "@types/chai": "^4.2.11", + "@types/lodash": "^4.14.198", + "@types/mocha": "^10.0.1", + "@types/node": "^14.14.6", + "@types/shelljs": "^0.8.8", + "@types/sinon": "^10.0.13", + "@types/sinon-chai": "^3.2.9", + "@typescript-eslint/eslint-plugin": "^6.9.0", + "@typescript-eslint/parser": "^6.9.0", + "chai": "^4.3.7", + "color": "3.0.0", + "colors": "1.1.2", + "cross-env": "5.1.3", + "cz-conventional-changelog": "1.2.0", + "dependency-cruiser": "^9.12.0", + "eslint": "^8.52.0", + "form-data": "^3.0.0", + "fs-extra": "^8.1.0", + "husky": "^4.2.5", + "lint-staged": "^10.2.11", + "lodash": "^4.17.21", + "mocha": "^10.2.0", + "nodemon": "^1.9.2", + "npm-run-all": "4.1.2", + "prettier": "^2.5.1", + "shelljs": "^0.8.4", + "shx": "^0.3.2", + "sinon": "^15.0.1", + "sinon-chai": "^3.7.0", + "source-map-support": "0.5.3", + "ts-node": "^10.9.1", + "typescript": "~4.9.4", + "validate-commit-msg": "2.14.0", + "web-streams-polyfill": "^3.0.2" + }, + "files": [ + "dist/cjs/**/!(*.tsbuildinfo)", + "dist/esm/**/!(*.tsbuildinfo)", + "dist/types/**/!(*.tsbuildinfo)", + "src", + "CHANGELOG.md", + "CODE_OF_CONDUCT.md", + "LICENSE.txt", + "package.json", + "README.md", + "tsconfig.json" + ], + "husky": { + "hooks": { + "pre-commit": "lint-staged", + "commit-msg": "validate-commit-msg" + } + }, + "peerDependencies": { + "@types/node": "^20.6.3", + "typescript": "^5.2.2" + } +} diff --git a/packages/observable/src/index.ts b/packages/observable/src/index.ts new file mode 100644 index 0000000000..5d055ffe84 --- /dev/null +++ b/packages/observable/src/index.ts @@ -0,0 +1,11 @@ +export { Observable, from, config, Subscription, Subscriber, operate, UnsubscriptionError, isObservable } from './internal/Observable.js'; +export type { GlobalConfig, SubscriberOverrides } from './internal/Observable.js'; +export { + Subscribable, + SubscriptionLike, + TeardownLogic, + Unsubscribable, + UnaryFunction, + OperatorFunction, + ObservableInput, +} from './internal/types.js'; diff --git a/packages/observable/src/internal/Observable.ts b/packages/observable/src/internal/Observable.ts new file mode 100644 index 0000000000..4ec5003c00 --- /dev/null +++ b/packages/observable/src/internal/Observable.ts @@ -0,0 +1,1308 @@ +import { isFunction, nextNotification, errorNotification, COMPLETE_NOTIFICATION, readableStreamLikeToAsyncGenerator } from './utils.js'; +import type { + TeardownLogic, + UnaryFunction, + Subscribable, + Observer, + OperatorFunction, + Unsubscribable, + SubscriptionLike, + ObservableNotification, + ObservableInput, + ObservedValueOf, + ReadableStreamLike, + ValueFromArray, +} from './types.js'; +import { getObservableInputType, ObservableInputType } from './getObservableInputType.js'; + +/** + * An error thrown when one or more errors have occurred during the + * `unsubscribe` of a {@link Subscription}. + */ +export class UnsubscriptionError extends Error { + /** + * @deprecated Internal implementation detail. Do not construct error instances. + * Cannot be tagged as internal: https://github.com/ReactiveX/rxjs/issues/6269 + */ + constructor(public errors: any[]) { + super( + errors + ? `${errors.length} errors occurred during unsubscription: +${errors.map((err, i) => `${i + 1}) ${err.toString()}`).join('\n ')}` + : '' + ); + this.name = 'UnsubscriptionError'; + } +} + +/** + * Represents a disposable resource, such as the execution of an Observable. A + * Subscription has one important method, `unsubscribe`, that takes no argument + * and just disposes the resource held by the subscription. + * + * Additionally, subscriptions may be grouped together through the `add()` + * method, which will attach a child Subscription to the current Subscription. + * When a Subscription is unsubscribed, all its children (and its grandchildren) + * will be unsubscribed as well. + */ +export class Subscription implements SubscriptionLike { + public static EMPTY = (() => { + const empty = new Subscription(); + empty.closed = true; + return empty; + })(); + + /** + * A flag to indicate whether this Subscription has already been unsubscribed. + */ + public closed = false; + + /** + * The list of registered finalizers to execute upon unsubscription. Adding and removing from this + * list occurs in the {@link #add} and {@link #remove} methods. + */ + private _finalizers: Set> | null = null; + + /** + * @param initialTeardown A function executed first as part of the finalization + * process that is kicked off when {@link #unsubscribe} is called. + */ + constructor(private initialTeardown?: () => void) {} + + /** + * Disposes the resources held by the subscription. May, for instance, cancel + * an ongoing Observable execution or cancel any other type of work that + * started when the Subscription was created. + */ + unsubscribe(): void { + let errors: any[] | undefined; + + if (!this.closed) { + this.closed = true; + + const { initialTeardown: initialFinalizer } = this; + if (isFunction(initialFinalizer)) { + try { + initialFinalizer(); + } catch (e) { + errors = e instanceof UnsubscriptionError ? e.errors : [e]; + } + } + + const { _finalizers } = this; + if (_finalizers) { + this._finalizers = null; + for (const finalizer of _finalizers) { + try { + execFinalizer(finalizer); + } catch (err) { + errors = errors ?? []; + if (err instanceof UnsubscriptionError) { + errors.push(...err.errors); + } else { + errors.push(err); + } + } + } + } + + if (errors) { + throw new UnsubscriptionError(errors); + } + } + } + + /** + * Adds a finalizer to this subscription, so that finalization will be unsubscribed/called + * when this subscription is unsubscribed. If this subscription is already {@link #closed}, + * because it has already been unsubscribed, then whatever finalizer is passed to it + * will automatically be executed (unless the finalizer itself is also a closed subscription). + * + * Closed Subscriptions cannot be added as finalizers to any subscription. Adding a closed + * subscription to a any subscription will result in no operation. (A noop). + * + * Adding a subscription to itself, or adding `null` or `undefined` will not perform any + * operation at all. (A noop). + * + * `Subscription` instances that are added to this instance will automatically remove themselves + * if they are unsubscribed. Functions and {@link Unsubscribable} objects that you wish to remove + * will need to be removed manually with {@link #remove} + * + * @param teardown The finalization logic to add to this subscription. + */ + add(teardown: TeardownLogic): void { + // Only add the finalizer if it's not undefined + // and don't add a subscription to itself. + if (teardown && teardown !== this) { + if (this.closed) { + // If this subscription is already closed, + // execute whatever finalizer is handed to it automatically. + execFinalizer(teardown); + } else { + if (teardown && 'add' in teardown) { + // If teardown is a subscription, we can make sure that if it + // unsubscribes first, it removes itself from this subscription. + teardown.add(() => { + this.remove(teardown); + }); + } + + this._finalizers ??= new Set(); + this._finalizers.add(teardown); + } + } + } + + /** + * Removes a finalizer from this subscription that was previously added with the {@link #add} method. + * + * Note that `Subscription` instances, when unsubscribed, will automatically remove themselves + * from every other `Subscription` they have been added to. This means that using the `remove` method + * is not a common thing and should be used thoughtfully. + * + * If you add the same finalizer instance of a function or an unsubscribable object to a `Subscription` instance + * more than once, you will need to call `remove` the same number of times to remove all instances. + * + * All finalizer instances are removed to free up memory upon unsubscription. + * + * TIP: In instances you're adding and removing _Subscriptions from other Subscriptions_, you should + * be sure to unsubscribe or otherwise get rid of the child subscription reference as soon as you remove it. + * The child subscription has a reference to the parent it was added to via closure. In most cases, this + * a non-issue, as child subscriptions are rarely long-lived. + * + * @param teardown The finalizer to remove from this subscription + */ + remove(teardown: Exclude): void { + this._finalizers?.delete(teardown); + } +} + +// Even though Subscription only conditionally implements `Symbol.dispose` +// if it's available, we still need to declare it here so that TypeScript +// knows that it exists on the prototype when it is available. +export interface Subscription { + [Symbol.dispose](): void; +} + +if (typeof Symbol.dispose === 'symbol') { + Subscription.prototype[Symbol.dispose] = Subscription.prototype.unsubscribe; +} + +function execFinalizer(finalizer: Unsubscribable | (() => void)) { + if (isFunction(finalizer)) { + finalizer(); + } else { + finalizer.unsubscribe(); + } +} + +export interface SubscriberOverrides { + /** + * If provided, this function will be called whenever the {@link Subscriber}'s + * `next` method is called, with the value that was passed to that call. If + * an error is thrown within this function, it will be handled and passed to + * the destination's `error` method. + * @param value The value that is being observed from the source. + */ + next?: (value: T) => void; + /** + * If provided, this function will be called whenever the {@link Subscriber}'s + * `error` method is called, with the error that was passed to that call. If + * an error is thrown within this function, it will be handled and passed to + * the destination's `error` method. + * @param err An error that has been thrown by the source observable. + */ + error?: (err: any) => void; + /** + * If provided, this function will be called whenever the {@link Subscriber}'s + * `complete` method is called. If an error is thrown within this function, it + * will be handled and passed to the destination's `error` method. + */ + complete?: () => void; + /** + * If provided, this function will be called after all teardown has occurred + * for this {@link Subscriber}. This is generally used for cleanup purposes + * during operator development. + */ + finalize?: () => void; +} + +/** + * Implements the {@link Observer} interface and extends the + * {@link Subscription} class. While the {@link Observer} is the public API for + * consuming the values of an {@link Observable}, all Observers get converted to + * a Subscriber, in order to provide Subscription-like capabilities such as + * `unsubscribe`. Subscriber is a common type in RxJS, and crucial for + * implementing operators, but it is rarely used as a public API. + */ +export class Subscriber extends Subscription implements Observer { + /** @internal */ + protected isStopped: boolean = false; + /** @internal */ + protected destination: Observer; + + /** @internal */ + protected readonly _nextOverride: ((value: T) => void) | null = null; + /** @internal */ + protected readonly _errorOverride: ((err: any) => void) | null = null; + /** @internal */ + protected readonly _completeOverride: (() => void) | null = null; + /** @internal */ + protected readonly _onFinalize: (() => void) | null = null; + + /** + * @deprecated Do not create instances of `Subscriber` directly. Use {@link operate} instead. + */ + constructor(destination?: Subscriber | Partial> | ((value: T) => void) | null); + + /** + * @internal + */ + constructor(destination: Subscriber | Partial> | ((value: any) => void) | null, overrides: SubscriberOverrides); + + /** + * Creates an instance of an RxJS Subscriber. This is the workhorse of the library. + * + * If another instance of Subscriber is passed in, it will automatically wire up unsubscription + * between this instance and the passed in instance. + * + * If a partial or full observer is passed in, it will be wrapped and appropriate safeguards will be applied. + * + * If a next-handler function is passed in, it will be wrapped and appropriate safeguards will be applied. + * + * @param destination A subscriber, partial observer, or function that receives the next value. + * @deprecated Do not create instances of `Subscriber` directly. Use {@link operate} instead. + */ + constructor(destination?: Subscriber | Partial> | ((value: T) => void) | null, overrides?: SubscriberOverrides) { + super(); + + // The only way we know that error reporting safety has been applied is if we own it. + this.destination = destination instanceof Subscriber ? destination : createSafeObserver(destination); + + this._nextOverride = overrides?.next ?? null; + this._errorOverride = overrides?.error ?? null; + this._completeOverride = overrides?.complete ?? null; + this._onFinalize = overrides?.finalize ?? null; + + // It's important - for performance reasons - that all of this class's + // members are initialized and that they are always initialized in the same + // order. This will ensure that all Subscriber instances have the + // same hidden class in V8. This, in turn, will help keep the number of + // hidden classes involved in property accesses within the base class as + // low as possible. If the number of hidden classes involved exceeds four, + // the property accesses will become megamorphic and performance penalties + // will be incurred - i.e. inline caches won't be used. + // + // The reasons for ensuring all instances have the same hidden class are + // further discussed in this blog post from Benedikt Meurer: + // https://benediktmeurer.de/2018/03/23/impact-of-polymorphism-on-component-based-frameworks-like-react/ + this._next = this._nextOverride ? overrideNext : this._next; + this._error = this._errorOverride ? overrideError : this._error; + this._complete = this._completeOverride ? overrideComplete : this._complete; + + // Automatically chain subscriptions together here. + // if destination appears to be one of our subscriptions, we'll chain it. + if (hasAddAndUnsubscribe(destination)) { + destination.add(this); + } + } + + /** + * The {@link Observer} callback to receive notifications of type `next` from + * the Observable, with a value. The Observable may call this method 0 or more + * times. + * @param value The `next` value. + */ + next(value: T): void { + if (this.isStopped) { + handleStoppedNotification(nextNotification(value), this); + } else { + this._next(value!); + } + } + + /** + * The {@link Observer} callback to receive notifications of type `error` from + * the Observable, with an attached `Error`. Notifies the Observer that + * the Observable has experienced an error condition. + * @param err The `error` exception. + */ + error(err?: any): void { + if (this.isStopped) { + handleStoppedNotification(errorNotification(err), this); + } else { + this.isStopped = true; + this._error(err); + } + } + + /** + * The {@link Observer} callback to receive a valueless notification of type + * `complete` from the Observable. Notifies the Observer that the Observable + * has finished sending push-based notifications. + */ + complete(): void { + if (this.isStopped) { + handleStoppedNotification(COMPLETE_NOTIFICATION, this); + } else { + this.isStopped = true; + this._complete(); + } + } + + unsubscribe(): void { + if (!this.closed) { + this.isStopped = true; + super.unsubscribe(); + this._onFinalize?.(); + } + } + + protected _next(value: T): void { + this.destination.next(value); + } + + protected _error(err: any): void { + try { + this.destination.error(err); + } finally { + this.unsubscribe(); + } + } + + protected _complete(): void { + try { + this.destination.complete(); + } finally { + this.unsubscribe(); + } + } +} + +/** + * The {@link GlobalConfig} object for RxJS. It is used to configure things + * like how to react on unhandled errors. + */ +export const config: GlobalConfig = { + onUnhandledError: null, + onStoppedNotification: null, +}; + +/** + * The global configuration object for RxJS, used to configure things + * like how to react on unhandled errors. Accessible via {@link config} + * object. + */ +export interface GlobalConfig { + /** + * A registration point for unhandled errors from RxJS. These are errors that + * cannot were not handled by consuming code in the usual subscription path. For + * example, if you have this configured, and you subscribe to an observable without + * providing an error handler, errors from that subscription will end up here. This + * will _always_ be called asynchronously on another job in the runtime. This is because + * we do not want errors thrown in this user-configured handler to interfere with the + * behavior of the library. + */ + onUnhandledError: ((err: any) => void) | null; + + /** + * A registration point for notifications that cannot be sent to subscribers because they + * have completed, errored or have been explicitly unsubscribed. By default, next, complete + * and error notifications sent to stopped subscribers are noops. However, sometimes callers + * might want a different behavior. For example, with sources that attempt to report errors + * to stopped subscribers, a caller can configure RxJS to throw an unhandled error instead. + * This will _always_ be called asynchronously on another job in the runtime. This is because + * we do not want errors thrown in this user-configured handler to interfere with the + * behavior of the library. + */ + onStoppedNotification: ((notification: ObservableNotification, subscriber: Subscriber) => void) | null; +} + +function overrideNext(this: Subscriber, value: T): void { + try { + this._nextOverride!(value); + } catch (error) { + this.destination.error(error); + } +} + +function overrideError(this: Subscriber, err: any): void { + try { + this._errorOverride!(err); + } catch (error) { + this.destination.error(error); + } finally { + this.unsubscribe(); + } +} + +function overrideComplete(this: Subscriber): void { + try { + this._completeOverride!(); + } catch (error) { + this.destination.error(error); + } finally { + this.unsubscribe(); + } +} + +class ConsumerObserver implements Observer { + constructor(private partialObserver: Partial>) {} + + next(value: T): void { + const { partialObserver } = this; + if (partialObserver.next) { + try { + partialObserver.next(value); + } catch (error) { + reportUnhandledError(error); + } + } + } + + error(err: any): void { + const { partialObserver } = this; + if (partialObserver.error) { + try { + partialObserver.error(err); + } catch (error) { + reportUnhandledError(error); + } + } else { + reportUnhandledError(err); + } + } + + complete(): void { + const { partialObserver } = this; + if (partialObserver.complete) { + try { + partialObserver.complete(); + } catch (error) { + reportUnhandledError(error); + } + } + } +} + +function createSafeObserver(observerOrNext?: Partial> | ((value: T) => void) | null): Observer { + return new ConsumerObserver(!observerOrNext || isFunction(observerOrNext) ? { next: observerOrNext ?? undefined } : observerOrNext); +} + +/** + * A handler for notifications that cannot be sent to a stopped subscriber. + * @param notification The notification being sent. + * @param subscriber The stopped subscriber. + */ +function handleStoppedNotification(notification: ObservableNotification, subscriber: Subscriber) { + const { onStoppedNotification } = config; + onStoppedNotification && setTimeout(() => onStoppedNotification(notification, subscriber)); +} + +function hasAddAndUnsubscribe(value: any): value is Subscription { + return value && isFunction(value.unsubscribe) && isFunction(value.add); +} + +export interface OperateConfig extends SubscriberOverrides { + /** + * The destination subscriber to forward notifications to. This is also the + * subscriber that will receive unhandled errors if your `next`, `error`, or `complete` + * overrides throw. + */ + destination: Subscriber; +} + +/** + * Creates a new {@link Subscriber} instance that passes notifications on to the + * supplied `destination`. The overrides provided in the `config` argument for + * `next`, `error`, and `complete` will be called in such a way that any + * errors are caught and forwarded to the destination's `error` handler. The returned + * `Subscriber` will be "chained" to the `destination` such that when `unsubscribe` is + * called on the `destination`, the returned `Subscriber` will also be unsubscribed. + * + * Advanced: This ensures that subscriptions are properly wired up prior to starting the + * subscription logic. This prevents "synchronous firehose" scenarios where an + * inner observable from a flattening operation cannot be stopped by a downstream + * terminal operator like `take`. + * + * This is a utility designed to be used to create new operators for observables. + * + * For examples, please see our code base. + * + * @param config The configuration for creating a new subscriber for an operator. + * @returns A new subscriber that is chained to the destination. + */ +export function operate({ destination, ...subscriberOverrides }: OperateConfig) { + return new Subscriber(destination, subscriberOverrides); +} + +// Ensure that `Symbol.dispose` is defined in TypeScript +declare global { + interface SymbolConstructor { + readonly dispose: unique symbol; + } +} + +/** + * A representation of any set of values over any amount of time. This is the most basic building block + * of RxJS. + */ +export class Observable implements Subscribable { + /** + * @param subscribe The function that is called when the Observable is + * initially subscribed to. This function is given a Subscriber, to which new values + * can be `next`ed, or an `error` method can be called to raise an error, or + * `complete` can be called to notify of a successful completion. + */ + constructor(subscribe?: (this: Observable, subscriber: Subscriber) => TeardownLogic) { + if (subscribe) { + this._subscribe = subscribe; + } + } + + /** + * Invokes an execution of an Observable and registers Observer handlers for notifications it will emit. + * + * Use it when you have all these Observables, but still nothing is happening. + * + * `subscribe` is not a regular operator, but a method that calls Observable's internal `subscribe` function. It + * might be for example a function that you passed to Observable's constructor, but most of the time it is + * a library implementation, which defines what will be emitted by an Observable, and when it be will emitted. This means + * that calling `subscribe` is actually the moment when Observable starts its work, not when it is created, as it is often + * the thought. + * + * Apart from starting the execution of an Observable, this method allows you to listen for values + * that an Observable emits, as well as for when it completes or errors. You can achieve this in two + * of the following ways. + * + * The first way is creating an object that implements {@link Observer} interface. It should have methods + * defined by that interface, but note that it should be just a regular JavaScript object, which you can create + * yourself in any way you want (ES6 class, classic function constructor, object literal etc.). In particular, do + * not attempt to use any RxJS implementation details to create Observers - you don't need them. Remember also + * that your object does not have to implement all methods. If you find yourself creating a method that doesn't + * do anything, you can simply omit it. Note however, if the `error` method is not provided and an error happens, + * it will be thrown asynchronously. Errors thrown asynchronously cannot be caught using `try`/`catch`. Instead, + * use the {@link onUnhandledError} configuration option or use a runtime handler (like `window.onerror` or + * `process.on('error)`) to be notified of unhandled errors. Because of this, it's recommended that you provide + * an `error` method to avoid missing thrown errors. + * + * The second way is to give up on Observer object altogether and simply provide callback functions in place of its methods. + * This means you can provide three functions as arguments to `subscribe`, where the first function is equivalent + * of a `next` method, the second of an `error` method and the third of a `complete` method. Just as in case of an Observer, + * if you do not need to listen for something, you can omit a function by passing `undefined` or `null`, + * since `subscribe` recognizes these functions by where they were placed in function call. When it comes + * to the `error` function, as with an Observer, if not provided, errors emitted by an Observable will be thrown asynchronously. + * + * You can, however, subscribe with no parameters at all. This may be the case where you're not interested in terminal events + * and you also handled emissions internally by using operators (e.g. using `tap`). + * + * Whichever style of calling `subscribe` you use, in both cases it returns a Subscription object. + * This object allows you to call `unsubscribe` on it, which in turn will stop the work that an Observable does and will clean + * up all resources that an Observable used. Note that cancelling a subscription will not call `complete` callback + * provided to `subscribe` function, which is reserved for a regular completion signal that comes from an Observable. + * + * Remember that callbacks provided to `subscribe` are not guaranteed to be called asynchronously. + * It is an Observable itself that decides when these functions will be called. For example {@link of} + * by default emits all its values synchronously. Always check documentation for how given Observable + * will behave when subscribed and if its default behavior can be modified with a `scheduler`. + * + * #### Examples + * + * Subscribe with an {@link guide/observer Observer} + * + * ```ts + * import { of } from 'rxjs'; + * + * const sumObserver = { + * sum: 0, + * next(value) { + * console.log('Adding: ' + value); + * this.sum = this.sum + value; + * }, + * error() { + * // We actually could just remove this method, + * // since we do not really care about errors right now. + * }, + * complete() { + * console.log('Sum equals: ' + this.sum); + * } + * }; + * + * of(1, 2, 3) // Synchronously emits 1, 2, 3 and then completes. + * .subscribe(sumObserver); + * + * // Logs: + * // 'Adding: 1' + * // 'Adding: 2' + * // 'Adding: 3' + * // 'Sum equals: 6' + * ``` + * + * Subscribe with functions ({@link deprecations/subscribe-arguments deprecated}) + * + * ```ts + * import { of } from 'rxjs' + * + * let sum = 0; + * + * of(1, 2, 3).subscribe( + * value => { + * console.log('Adding: ' + value); + * sum = sum + value; + * }, + * undefined, + * () => console.log('Sum equals: ' + sum) + * ); + * + * // Logs: + * // 'Adding: 1' + * // 'Adding: 2' + * // 'Adding: 3' + * // 'Sum equals: 6' + * ``` + * + * Cancel a subscription + * + * ```ts + * import { interval } from 'rxjs'; + * + * const subscription = interval(1000).subscribe({ + * next(num) { + * console.log(num) + * }, + * complete() { + * // Will not be called, even when cancelling subscription. + * console.log('completed!'); + * } + * }); + * + * setTimeout(() => { + * subscription.unsubscribe(); + * console.log('unsubscribed!'); + * }, 2500); + * + * // Logs: + * // 0 after 1s + * // 1 after 2s + * // 'unsubscribed!' after 2.5s + * ``` + * + * @param observerOrNext Either an {@link Observer} with some or all callback methods, + * or the `next` handler that is called for each value emitted from the subscribed Observable. + * @return A subscription reference to the registered handlers. + */ + subscribe(observerOrNext?: Partial> | ((value: T) => void) | null): Subscription { + const subscriber = observerOrNext instanceof Subscriber ? observerOrNext : new Subscriber(observerOrNext); + subscriber.add(this._trySubscribe(subscriber)); + return subscriber; + } + + /** @internal */ + protected _trySubscribe(sink: Subscriber): TeardownLogic { + try { + return this._subscribe(sink); + } catch (err) { + // We don't need to return anything in this case, + // because it's just going to try to `add()` to a subscription + // above. + sink.error(err); + } + } + + /** + * Used as a NON-CANCELLABLE means of subscribing to an observable, for use with + * APIs that expect promises, like `async/await`. You cannot unsubscribe from this. + * + * **WARNING**: Only use this with observables you *know* will complete. If the source + * observable does not complete, you will end up with a promise that is hung up, and + * potentially all of the state of an async function hanging out in memory. To avoid + * this situation, look into adding something like {@link timeout}, {@link take}, + * {@link takeWhile}, or {@link takeUntil} amongst others. + * + * #### Example + * + * ```ts + * import { interval, take } from 'rxjs'; + * + * const source$ = interval(1000).pipe(take(4)); + * + * async function getTotal() { + * let total = 0; + * + * await source$.forEach(value => { + * total += value; + * console.log('observable -> ' + value); + * }); + * + * return total; + * } + * + * getTotal().then( + * total => console.log('Total: ' + total) + * ); + * + * // Expected: + * // 'observable -> 0' + * // 'observable -> 1' + * // 'observable -> 2' + * // 'observable -> 3' + * // 'Total: 6' + * ``` + * + * @param next A handler for each value emitted by the observable. + * @return A promise that either resolves on observable completion or + * rejects with the handled error. + */ + forEach(next: (value: T) => void): Promise { + return new Promise((resolve, reject) => { + const subscriber = new Subscriber({ + next: (value: T) => { + try { + next(value); + } catch (err) { + reject(err); + subscriber.unsubscribe(); + } + }, + error: reject, + complete: resolve, + }); + this.subscribe(subscriber); + }); + } + + /** @internal */ + protected _subscribe(_subscriber: Subscriber): TeardownLogic { + return; + } + + /** + * An interop point defined by the es7-observable spec https://github.com/zenparsing/es-observable + * @return This instance of the observable. + */ + [Symbol.observable ?? '@@observable']() { + return this; + } + + pipe(): Observable; + pipe(op1: UnaryFunction, A>): A; + pipe(op1: UnaryFunction, A>, op2: UnaryFunction): B; + pipe(op1: UnaryFunction, A>, op2: UnaryFunction, op3: UnaryFunction): C; + pipe(op1: UnaryFunction, A>, op2: UnaryFunction, op3: UnaryFunction, op4: UnaryFunction): D; + pipe( + op1: UnaryFunction, A>, + op2: UnaryFunction, + op3: UnaryFunction, + op4: UnaryFunction, + op5: UnaryFunction + ): E; + pipe( + op1: UnaryFunction, A>, + op2: UnaryFunction, + op3: UnaryFunction, + op4: UnaryFunction, + op5: UnaryFunction, + op6: UnaryFunction + ): F; + pipe( + op1: UnaryFunction, A>, + op2: UnaryFunction, + op3: UnaryFunction, + op4: UnaryFunction, + op5: UnaryFunction, + op6: UnaryFunction, + op7: UnaryFunction + ): G; + pipe( + op1: UnaryFunction, A>, + op2: UnaryFunction, + op3: UnaryFunction, + op4: UnaryFunction, + op5: UnaryFunction, + op6: UnaryFunction, + op7: UnaryFunction, + op8: UnaryFunction + ): H; + pipe( + op1: UnaryFunction, A>, + op2: UnaryFunction, + op3: UnaryFunction, + op4: UnaryFunction, + op5: UnaryFunction, + op6: UnaryFunction, + op7: UnaryFunction, + op8: UnaryFunction, + op9: UnaryFunction + ): I; + pipe( + op1: UnaryFunction, A>, + op2: UnaryFunction, + op3: UnaryFunction, + op4: UnaryFunction, + op5: UnaryFunction, + op6: UnaryFunction, + op7: UnaryFunction, + op8: UnaryFunction, + op9: UnaryFunction, + ...operations: OperatorFunction[] + ): Observable; + pipe( + op1: UnaryFunction, A>, + op2: UnaryFunction, + op3: UnaryFunction, + op4: UnaryFunction, + op5: UnaryFunction, + op6: UnaryFunction, + op7: UnaryFunction, + op8: UnaryFunction, + op9: UnaryFunction, + ...operations: UnaryFunction[] + ): unknown; + + /** + * Used to stitch together functional operators into a chain. + * + * ## Example + * + * ```ts + * import { interval, filter, map, scan } from 'rxjs'; + * + * interval(1000) + * .pipe( + * filter(x => x % 2 === 0), + * map(x => x + x), + * scan((acc, x) => acc + x) + * ) + * .subscribe(x => console.log(x)); + * ``` + * + * @return The Observable result of all the operators having been called + * in the order they were passed in. + */ + pipe(...operations: UnaryFunction[]): unknown { + return operations.reduce(pipeReducer, this as any); + } + + /** + * Observable is async iterable, so it can be used in `for await` loop. This method + * of subscription is cancellable by breaking the for await loop. Although it's not + * recommended to use Observable's AsyncIterable contract outside of `for await`, if + * you're consuming the Observable as an AsyncIterable, and you're _not_ using `for await`, + * you can use the `throw` or `return` methods on the `AsyncGenerator` we return to + * cancel the subscription. Note that the subscription to the observable does not start + * until the first value is requested from the AsyncIterable. + * + * Functionally, this is equivalent to using a {@link concatMap} with an `async` function. + * That means that while the body of the `for await` loop is executing, any values that arrive + * from the observable source will be queued up, so they can be processed by the `for await` + * loop in order. So, like {@link concatMap} it's important to understand the speed your + * source emits at, and the speed of the body of your `for await` loop. + * + * ## Example + * + * ```ts + * import { interval } from 'rxjs'; + * + * async function main() { + * // Subscribe to the observable using for await. + * for await (const value of interval(1000)) { + * console.log(value); + * + * if (value > 5) { + * // Unsubscribe from the interval if we get a value greater than 5 + * break; + * } + * } + * } + * + * main(); + * ``` + */ + [Symbol.asyncIterator](): AsyncGenerator { + let subscription: Subscription | undefined; + let hasError = false; + let error: unknown; + let completed = false; + const values: T[] = []; + const deferreds: [(value: IteratorResult) => void, (reason: unknown) => void][] = []; + + const handleError = (err: unknown) => { + hasError = true; + error = err; + while (deferreds.length) { + const [_, reject] = deferreds.shift()!; + reject(err); + } + }; + + const handleComplete = () => { + completed = true; + while (deferreds.length) { + const [resolve] = deferreds.shift()!; + resolve({ value: undefined, done: true }); + } + }; + + return { + next: (): Promise> => { + if (!subscription) { + // We only want to start the subscription when the user starts iterating. + subscription = this.subscribe({ + next: (value) => { + if (deferreds.length) { + const [resolve] = deferreds.shift()!; + resolve({ value, done: false }); + } else { + values.push(value); + } + }, + error: handleError, + complete: handleComplete, + }); + } + + // If we already have some values in our buffer, we'll return the next one. + if (values.length) { + return Promise.resolve({ value: values.shift()!, done: false }); + } + + // This was already completed, so we're just going to return a done result. + if (completed) { + return Promise.resolve({ value: undefined, done: true }); + } + + // There was an error, so we're going to return an error result. + if (hasError) { + return Promise.reject(error); + } + + // Otherwise, we need to make them wait for a value. + return new Promise((resolve, reject) => { + deferreds.push([resolve, reject]); + }); + }, + throw: (err): Promise> => { + subscription?.unsubscribe(); + // NOTE: I did some research on this, and as of Feb 2023, Chrome doesn't seem to do + // anything with pending promises returned from `next()` when `throw()` is called. + // However, for consumption of observables, I don't want RxJS taking the heat for that + // quirk/leak of the type. So we're going to reject all pending promises we've nexted out here. + handleError(err); + return Promise.reject(err); + }, + return: (): Promise> => { + subscription?.unsubscribe(); + // NOTE: I did some research on this, and as of Feb 2023, Chrome doesn't seem to do + // anything with pending promises returned from `next()` when `throw()` is called. + // However, for consumption of observables, I don't want RxJS taking the heat for that + // quirk/leak of the type. So we're going to resolve all pending promises we've nexted out here. + handleComplete(); + return Promise.resolve({ value: undefined, done: true }); + }, + [Symbol.asyncIterator]() { + return this; + }, + }; + } +} + +function pipeReducer(prev: any, fn: UnaryFunction) { + return fn(prev); +} + +/** + * Handles an error on another job either with the user-configured {@link onUnhandledError}, + * or by throwing it on that new job so it can be picked up by `window.onerror`, `process.on('error')`, etc. + * + * This should be called whenever there is an error that is out-of-band with the subscription + * or when an error hits a terminal boundary of the subscription and no error handler was provided. + * + * @param err the error to report + */ +export function reportUnhandledError(err: any) { + setTimeout(() => { + const { onUnhandledError } = config; + if (onUnhandledError) { + // Execute the user-configured error handler. + onUnhandledError(err); + } else { + // Throw so it is picked up by the runtime's uncaught error mechanism. + throw err; + } + }); +} + +/** + * Creates an Observable from an Array, an array-like object, a Promise, an iterable object, or an Observable-like object. + * + * Converts almost anything to an Observable. + * + * ![](from.png) + * + * `from` converts various other objects and data types into Observables. It also converts a Promise, an array-like, or an + * iterable + * object into an Observable that emits the items in that promise, array, or iterable. A String, in this context, is treated + * as an array of characters. Observable-like objects (contains a function named with the ES2015 Symbol for Observable) can also be + * converted through this operator. + * + * ## Examples + * + * Converts an array to an Observable + * + * ```ts + * import { from } from 'rxjs'; + * + * const array = [10, 20, 30]; + * const result = from(array); + * + * result.subscribe(x => console.log(x)); + * + * // Logs: + * // 10 + * // 20 + * // 30 + * ``` + * + * Convert an infinite iterable (from a generator) to an Observable + * + * ```ts + * import { from, take } from 'rxjs'; + * + * function* generateDoubles(seed) { + * let i = seed; + * while (true) { + * yield i; + * i = 2 * i; // double it + * } + * } + * + * const iterator = generateDoubles(3); + * const result = from(iterator).pipe(take(10)); + * + * result.subscribe(x => console.log(x)); + * + * // Logs: + * // 3 + * // 6 + * // 12 + * // 24 + * // 48 + * // 96 + * // 192 + * // 384 + * // 768 + * // 1536 + * ``` + * + * @see {@link fromEvent} + * @see {@link fromEventPattern} + * @see {@link scheduled} + * + * @param input A subscription object, a Promise, an Observable-like, + * an Array, an iterable, async iterable, or an array-like object to be converted. + */ + +export function from>(input: O): Observable>; +export function from(input: ObservableInput): Observable { + const type = getObservableInputType(input); + switch (type) { + case ObservableInputType.Own: + return input as Observable; + case ObservableInputType.InteropObservable: + return fromInteropObservable(input); + case ObservableInputType.ArrayLike: + return fromArrayLike(input as ArrayLike); + case ObservableInputType.Promise: + return fromPromise(input as PromiseLike); + case ObservableInputType.AsyncIterable: + return fromAsyncIterable(input as AsyncIterable); + case ObservableInputType.Iterable: + return fromIterable(input as Iterable); + case ObservableInputType.ReadableStreamLike: + return fromReadableStreamLike(input as ReadableStreamLike); + } +} + +/** + * Creates an RxJS Observable from an object that implements `Symbol.observable`. + * @param obj An object that properly implements `Symbol.observable`. + */ +function fromInteropObservable(obj: any) { + return new Observable((subscriber: Subscriber) => { + const obs = obj[Symbol.observable ?? '@@observable'](); + if (isFunction(obs.subscribe)) { + return obs.subscribe(subscriber); + } + // Should be caught by observable subscribe function error handling. + throw new TypeError('Provided object does not correctly implement Symbol.observable'); + }); +} + +/** + * Synchronously emits the values of an array like and completes. + * This is exported because there are creation functions and operators that need to + * make direct use of the same logic, and there's no reason to make them run through + * `from` conditionals because we *know* they're dealing with an array. + * @param array The array to emit values from + */ +export function fromArrayLike(array: ArrayLike) { + return new Observable((subscriber: Subscriber) => { + subscribeToArray(array, subscriber); + }); +} + +export function fromPromise(promise: PromiseLike) { + return new Observable((subscriber: Subscriber) => { + promise + .then( + (value) => { + if (!subscriber.closed) { + subscriber.next(value); + subscriber.complete(); + } + }, + (err: any) => subscriber.error(err) + ) + .then(null, reportUnhandledError); + }); +} + +function fromIterable(iterable: Iterable) { + return new Observable((subscriber: Subscriber) => { + for (const value of iterable) { + subscriber.next(value); + if (subscriber.closed) { + return; + } + } + subscriber.complete(); + }); +} + +function fromAsyncIterable(asyncIterable: AsyncIterable) { + return new Observable((subscriber: Subscriber) => { + process(asyncIterable, subscriber).catch((err) => subscriber.error(err)); + }); +} + +function fromReadableStreamLike(readableStream: ReadableStreamLike) { + return fromAsyncIterable(readableStreamLikeToAsyncGenerator(readableStream)); +} + +async function process(asyncIterable: AsyncIterable, subscriber: Subscriber) { + for await (const value of asyncIterable) { + subscriber.next(value); + // A side-effect may have closed our subscriber, + // check before the next iteration. + if (subscriber.closed) { + return; + } + } + subscriber.complete(); +} + +/** + * Subscribes to an ArrayLike with a subscriber + * @param array The array or array-like to subscribe to + * @param subscriber + */ +export function subscribeToArray(array: ArrayLike, subscriber: Subscriber) { + // Loop over the array and emit each value. Note two things here: + // 1. We're making sure that the subscriber is not closed on each loop. + // This is so we don't continue looping over a very large array after + // something like a `take`, `takeWhile`, or other synchronous unsubscription + // has already unsubscribed. + // 2. In this form, reentrant code can alter that array we're looping over. + // This is a known issue, but considered an edge case. The alternative would + // be to copy the array before executing the loop, but this has + // performance implications. + const length = array.length; + for (let i = 0; i < length; i++) { + if (subscriber.closed) { + return; + } + subscriber.next(array[i]); + } + subscriber.complete(); +} + +/** + * Tests to see if the object is an RxJS {@link Observable} + * @param obj the object to test + */ +export function isObservable(obj: any): obj is Observable { + // The !! is to ensure that this publicly exposed function returns + // `false` if something like `null` or `0` is passed. + return !!obj && (obj instanceof Observable || (isFunction(obj.lift) && isFunction(obj.subscribe))); +} + +// Devs are more likely to pass null or undefined than they are a scheduler +// without accompanying values. To make things easier for (naughty) devs who +// use the `strictNullChecks: false` TypeScript compiler option, these +// overloads with explicit null and undefined values are included. + +export function of(value: null): Observable; +export function of(value: undefined): Observable; + +export function of(): Observable; +export function of(value: T): Observable; +export function of(...values: A): Observable>; + +/** + * Converts the arguments to an observable sequence. + * + * Each argument becomes a `next` notification. + * + * ![](of.png) + * + * Unlike {@link from}, it does not do any flattening and emits each argument in whole + * as a separate `next` notification. + * + * ## Examples + * + * Emit the values `10, 20, 30` + * + * ```ts + * import { of } from 'rxjs'; + * + * of(10, 20, 30) + * .subscribe({ + * next: value => console.log('next:', value), + * error: err => console.log('error:', err), + * complete: () => console.log('the end'), + * }); + * + * // Outputs + * // next: 10 + * // next: 20 + * // next: 30 + * // the end + * ``` + * + * Emit the array `[1, 2, 3]` + * + * ```ts + * import { of } from 'rxjs'; + * + * of([1, 2, 3]) + * .subscribe({ + * next: value => console.log('next:', value), + * error: err => console.log('error:', err), + * complete: () => console.log('the end'), + * }); + * + * // Outputs + * // next: [1, 2, 3] + * // the end + * ``` + * + * @see {@link from} + * @see {@link range} + * + * @param values A comma separated list of arguments you want to be emitted. + * @return An Observable that synchronously emits the arguments described + * above and then immediately completes. + */ +export function of(...values: T[]): Observable { + return fromArrayLike(values); +} diff --git a/packages/observable/src/internal/getObservableInputType.ts b/packages/observable/src/internal/getObservableInputType.ts new file mode 100644 index 0000000000..aca4edb67f --- /dev/null +++ b/packages/observable/src/internal/getObservableInputType.ts @@ -0,0 +1,48 @@ +import { isInteropObservable, isArrayLike, isPromise, isAsyncIterable, isIterable, isReadableStreamLike } from './utils.js'; +import { Observable } from './Observable.js'; + +export enum ObservableInputType { + Own, + InteropObservable, + ArrayLike, + Promise, + AsyncIterable, + Iterable, + ReadableStreamLike, +} + +/** + * Examines a value and returns an enum indicating what type of ObservableInput it is. + * This exists because both {@link from} and {@link scheduled} need to perform this same + * logical check in the same order. + * @param input a value we want to check to see if it can be converted to an observable + * @returns An enum value indicating the type of conversion that can be done. + */ +export function getObservableInputType(input: unknown): ObservableInputType { + if (input instanceof Observable) { + return ObservableInputType.Own; + } + if (isInteropObservable(input)) { + return ObservableInputType.InteropObservable; + } + if (isArrayLike(input)) { + return ObservableInputType.ArrayLike; + } + if (isPromise(input)) { + return ObservableInputType.Promise; + } + if (isAsyncIterable(input)) { + return ObservableInputType.AsyncIterable; + } + if (isIterable(input)) { + return ObservableInputType.Iterable; + } + if (isReadableStreamLike(input)) { + return ObservableInputType.ReadableStreamLike; + } + throw new TypeError( + `You provided ${ + input !== null && typeof input === 'object' ? 'an invalid object' : `'${input}'` + } where a stream was expected. You can provide an Observable, Promise, ReadableStream, Array, AsyncIterable, or Iterable.` + ); +} diff --git a/packages/observable/src/internal/types.ts b/packages/observable/src/internal/types.ts new file mode 100644 index 0000000000..11c008f670 --- /dev/null +++ b/packages/observable/src/internal/types.ts @@ -0,0 +1,157 @@ +// https://github.com/microsoft/TypeScript/issues/40462#issuecomment-689879308 +/// + +import type { Observable, Subscription } from './Observable.js'; + +/** + * Note: This will add Symbol.observable globally for all TypeScript users, + * however, we are no longer polyfilling Symbol.observable + */ +declare global { + interface SymbolConstructor { + readonly observable: symbol; + } +} + +/* OPERATOR INTERFACES */ + +/** + * A function type interface that describes a function that accepts one parameter `T` + * and returns another parameter `R`. + * + * Usually used to describe {@link OperatorFunction} - it always takes a single + * parameter (the source Observable) and returns another Observable. + */ +export interface UnaryFunction { + (source: T): R; +} + +export interface OperatorFunction extends UnaryFunction, Observable> {} + +/* SUBSCRIPTION INTERFACES */ + +export interface Unsubscribable { + unsubscribe(): void; +} + +export type TeardownLogic = Subscription | Unsubscribable | (() => void) | void; + +export interface SubscriptionLike extends Unsubscribable { + unsubscribe(): void; + readonly closed: boolean; +} + +/* OBSERVABLE INTERFACES */ + +export interface Subscribable { + subscribe(observer: Partial>): Unsubscribable; +} + +/** + * Valid types that can be converted to observables. + */ +export type ObservableInput = + | Observable + | InteropObservable + | AsyncIterable + | PromiseLike + | ArrayLike + | Iterable + | ReadableStreamLike; + +/** + * An object that implements the `Symbol.observable` interface. + */ +export interface InteropObservable { + [Symbol.observable]: () => Subscribable; +} + +/* NOTIFICATIONS */ + +/** + * A notification representing a "next" from an observable. + * Can be used with {@link dematerialize}. + */ +export interface NextNotification { + /** The kind of notification. Always "N" */ + kind: 'N'; + /** The value of the notification. */ + value: T; +} + +/** + * A notification representing an "error" from an observable. + * Can be used with {@link dematerialize}. + */ +export interface ErrorNotification { + /** The kind of notification. Always "E" */ + kind: 'E'; + error: any; +} + +/** + * A notification representing a "completion" from an observable. + * Can be used with {@link dematerialize}. + */ +export interface CompleteNotification { + kind: 'C'; +} + +/** + * Valid observable notification types. + */ +export type ObservableNotification = NextNotification | ErrorNotification | CompleteNotification; + +/** + * An object interface that defines a set of callback functions a user can use to get + * notified of any set of {@link Observable} + * {@link guide/glossary-and-semantics#notification notification} events. + * + * For more info, please refer to {@link guide/observer this guide}. + */ +export interface Observer { + /** + * A callback function that gets called by the producer during the subscription when + * the producer "has" the `value`. It won't be called if `error` or `complete` callback + * functions have been called, nor after the consumer has unsubscribed. + * + * For more info, please refer to {@link guide/glossary-and-semantics#next this guide}. + */ + next: (value: T) => void; + /** + * A callback function that gets called by the producer if and when it encountered a + * problem of any kind. The errored value will be provided through the `err` parameter. + * This callback can't be called more than one time, it can't be called if the + * `complete` callback function have been called previously, nor it can't be called if + * the consumer has unsubscribed. + * + * For more info, please refer to {@link guide/glossary-and-semantics#error this guide}. + */ + error: (err: any) => void; + /** + * A callback function that gets called by the producer if and when it has no more + * values to provide (by calling `next` callback function). This means that no error + * has happened. This callback can't be called more than one time, it can't be called + * if the `error` callback function have been called previously, nor it can't be called + * if the consumer has unsubscribed. + * + * For more info, please refer to {@link guide/glossary-and-semantics#complete this guide}. + */ + complete: () => void; +} + +/** + * Extracts the type from an `ObservableInput`. If you have + * `O extends ObservableInput` and you pass in `Observable`, or + * `Promise`, etc, it will type as `number`. + */ +export type ObservedValueOf = O extends ObservableInput ? T : never; + +/** + * The base signature RxJS will look for to identify and use + * a [ReadableStream](https://streams.spec.whatwg.org/#rs-class) + * as an {@link ObservableInput} source. + */ +export type ReadableStreamLike = Pick, 'getReader'>; + +export type ValueFromArray = T extends ArrayLike ? R : never; diff --git a/packages/observable/src/internal/utils.ts b/packages/observable/src/internal/utils.ts new file mode 100644 index 0000000000..bd6f6d4e27 --- /dev/null +++ b/packages/observable/src/internal/utils.ts @@ -0,0 +1,103 @@ +import type { ReadableStreamLike, InteropObservable, CompleteNotification, ErrorNotification, NextNotification } from './types.js'; + +export function isAsyncIterable(obj: any): obj is AsyncIterable { + return Symbol.asyncIterator && isFunction(obj?.[Symbol.asyncIterator]); +} + +/** + * Returns true if the object is a function. + * @param value The value to check + */ +export function isFunction(value: any): value is (...args: any[]) => any { + return typeof value === 'function'; +} + +export function isReadableStreamLike(obj: any): obj is ReadableStreamLike { + // We don't want to use instanceof checks because they would return + // false for instances from another Realm, like an