import { Subject } from '../Subject'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; import { refCount as higherOrderRefCount } from '../operators/refCount'; import { createOperatorSubscriber } from '../operators/OperatorSubscriber'; import { hasLift } from '../util/lift'; /** * @class ConnectableObservable * @deprecated Will be removed in v8. Use {@link connectable} to create a connectable observable. * If you are using the `refCount` method of `ConnectableObservable`, use the {@link share} operator * instead. * Details: https://rxjs.dev/deprecations/multicasting */ export class ConnectableObservable extends Observable { protected _subject: Subject | null = null; protected _refCount: number = 0; protected _connection: Subscription | null = null; /** * @param source The source observable * @param subjectFactory The factory that creates the subject used internally. * @deprecated Will be removed in v8. Use {@link connectable} to create a connectable observable. * `new ConnectableObservable(source, factory)` is equivalent to * `connectable(source, { connector: factory })`. * When the `refCount()` method is needed, the {@link share} operator should be used instead: * `new ConnectableObservable(source, factory).refCount()` is equivalent to * `source.pipe(share({ connector: factory }))`. * Details: https://rxjs.dev/deprecations/multicasting */ constructor(public source: Observable, protected subjectFactory: () => Subject) { super(); // If we have lift, monkey patch that here. This is done so custom observable // types will compose through multicast. Otherwise the resulting observable would // simply be an instance of `ConnectableObservable`. if (hasLift(source)) { this.lift = source.lift; } } /** @internal */ protected _subscribe(subscriber: Subscriber) { return this.getSubject().subscribe(subscriber); } protected getSubject(): Subject { const subject = this._subject; if (!subject || subject.isStopped) { this._subject = this.subjectFactory(); } return this._subject!; } protected _teardown() { this._refCount = 0; const { _connection } = this; this._subject = this._connection = null; _connection?.unsubscribe(); } /** * @deprecated {@link ConnectableObservable} will be removed in v8. Use {@link connectable} instead. * Details: https://rxjs.dev/deprecations/multicasting */ connect(): Subscription { let connection = this._connection; if (!connection) { connection = this._connection = new Subscription(); const subject = this.getSubject(); connection.add( this.source.subscribe( createOperatorSubscriber( subject as any, undefined, () => { this._teardown(); subject.complete(); }, (err) => { this._teardown(); subject.error(err); }, () => this._teardown() ) ) ); if (connection.closed) { this._connection = null; connection = Subscription.EMPTY; } } return connection; } /** * @deprecated {@link ConnectableObservable} will be removed in v8. Use the {@link share} operator instead. * Details: https://rxjs.dev/deprecations/multicasting */ refCount(): Observable { return higherOrderRefCount()(this) as Observable; } }