import { ObservableInput, ObservedValueOf, OperatorFunction } from '../types';
import { switchMap } from './switchMap';
import { operate } from '../util/lift';
// TODO: Generate a marble diagram for these docs.
/**
* Applies an accumulator function over the source Observable where the
* accumulator function itself returns an Observable, emitting values
* only from the most recently returned Observable.
*
* It's like {@link mergeScan}, but only the most recent
* Observable returned by the accumulator is merged into the outer Observable.
*
* @see {@link scan}
* @see {@link mergeScan}
* @see {@link switchMap}
*
* @param accumulator
* The accumulator function called on each source value.
* @param seed The initial accumulation value.
* @return A function that returns an observable of the accumulated values.
*/
export function switchScan>(
accumulator: (acc: R, value: T, index: number) => O,
seed: R
): OperatorFunction> {
return operate((source, subscriber) => {
// The state we will keep up to date to pass into our
// accumulator function at each new value from the source.
let state = seed;
// Use `switchMap` on our `source` to do the work of creating
// this operator. Note the backwards order here of `switchMap()(source)`
// to avoid needing to use `pipe` unnecessarily
switchMap(
// On each value from the source, call the accumulator with
// our previous state, the value and the index.
(value: T, index) => accumulator(state, value, index),
// Using the deprecated result selector here as a dirty trick
// to update our state with the flattened value.
(_, innerValue) => ((state = innerValue), innerValue)
)(source).subscribe(subscriber);
return () => {
// Release state on finalization
state = null!;
};
});
}