import { Operator } from './Operator'; import { Observable } from './Observable'; import { Subscriber } from './Subscriber'; import { Subscription, EMPTY_SUBSCRIPTION } from './Subscription'; import { Observer, SubscriptionLike, TeardownLogic } from './types'; import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError'; import { arrRemove } from './util/arrRemove'; import { errorContext } from './util/errorContext'; /** * A Subject is a special type of Observable that allows values to be * multicasted to many Observers. Subjects are like EventEmitters. * * Every Subject is an Observable and an Observer. You can subscribe to a * Subject, and you can call next to feed values as well as error and complete. */ export class Subject extends Observable implements SubscriptionLike { closed = false; private currentObservers: Observer[] | null = null; /** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */ observers: Observer[] = []; /** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */ isStopped = false; /** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */ hasError = false; /** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */ thrownError: any = null; /** * Creates a "subject" by basically gluing an observer to an observable. * * @nocollapse * @deprecated Recommended you do not use. Will be removed at some point in the future. Plans for replacement still under discussion. */ static create: (...args: any[]) => any = (destination: Observer, source: Observable): AnonymousSubject => { return new AnonymousSubject(destination, source); }; constructor() { // NOTE: This must be here to obscure Observable's constructor. super(); } /** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */ lift(operator: Operator): Observable { const subject = new AnonymousSubject(this, this); subject.operator = operator as any; return subject as any; } /** @internal */ protected _throwIfClosed() { if (this.closed) { throw new ObjectUnsubscribedError(); } } next(value: T) { errorContext(() => { this._throwIfClosed(); if (!this.isStopped) { if (!this.currentObservers) { this.currentObservers = Array.from(this.observers); } for (const observer of this.currentObservers) { observer.next(value); } } }); } error(err: any) { errorContext(() => { this._throwIfClosed(); if (!this.isStopped) { this.hasError = this.isStopped = true; this.thrownError = err; const { observers } = this; while (observers.length) { observers.shift()!.error(err); } } }); } complete() { errorContext(() => { this._throwIfClosed(); if (!this.isStopped) { this.isStopped = true; const { observers } = this; while (observers.length) { observers.shift()!.complete(); } } }); } unsubscribe() { this.isStopped = this.closed = true; this.observers = this.currentObservers = null!; } get observed() { return this.observers?.length > 0; } /** @internal */ protected _trySubscribe(subscriber: Subscriber): TeardownLogic { this._throwIfClosed(); return super._trySubscribe(subscriber); } /** @internal */ protected _subscribe(subscriber: Subscriber): Subscription { this._throwIfClosed(); this._checkFinalizedStatuses(subscriber); return this._innerSubscribe(subscriber); } /** @internal */ protected _innerSubscribe(subscriber: Subscriber) { const { hasError, isStopped, observers } = this; if (hasError || isStopped) { return EMPTY_SUBSCRIPTION; } this.currentObservers = null; observers.push(subscriber); return new Subscription(() => { this.currentObservers = null; arrRemove(observers, subscriber); }); } /** @internal */ protected _checkFinalizedStatuses(subscriber: Subscriber) { const { hasError, thrownError, isStopped } = this; if (hasError) { subscriber.error(thrownError); } else if (isStopped) { subscriber.complete(); } } /** * Creates a new Observable with this Subject as the source. You can do this * to create customize Observer-side logic of the Subject and conceal it from * code that uses the Observable. * @return {Observable} Observable that the Subject casts to */ asObservable(): Observable { const observable: any = new Observable(); observable.source = this; return observable; } } /** * @class AnonymousSubject */ export class AnonymousSubject extends Subject { constructor( /** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */ public destination?: Observer, source?: Observable ) { super(); this.source = source; } next(value: T) { this.destination?.next?.(value); } error(err: any) { this.destination?.error?.(err); } complete() { this.destination?.complete?.(); } /** @internal */ protected _subscribe(subscriber: Subscriber): Subscription { return this.source?.subscribe(subscriber) ?? EMPTY_SUBSCRIPTION; } }