import { createOperatorSubscriber } from '../../operators/OperatorSubscriber'; import { Observable } from '../../Observable'; import { innerFrom } from '../../observable/innerFrom'; import { ObservableInput } from '../../types'; export function fromFetch( input: string | Request, init: RequestInit & { selector: (response: Response) => ObservableInput; } ): Observable; export function fromFetch(input: string | Request, init?: RequestInit): Observable; /** * Uses [the Fetch API](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) to * make an HTTP request. * * **WARNING** Parts of the fetch API are still experimental. `AbortController` is * required for this implementation to work and use cancellation appropriately. * * Will automatically set up an internal [AbortController](https://developer.mozilla.org/en-US/docs/Web/API/AbortController) * in order to finalize the internal `fetch` when the subscription tears down. * * If a `signal` is provided via the `init` argument, it will behave like it usually does with * `fetch`. If the provided `signal` aborts, the error that `fetch` normally rejects with * in that scenario will be emitted as an error from the observable. * * ## Examples * * Basic use * * ```ts * import { fromFetch } from 'rxjs/fetch'; * import { switchMap, of, catchError } from 'rxjs'; * * const data$ = fromFetch('https://api.github.com/users?per_page=5').pipe( * switchMap(response => { * if (response.ok) { * // OK return data * return response.json(); * } else { * // Server is returning a status requiring the client to try something else. * return of({ error: true, message: `Error ${ response.status }` }); * } * }), * catchError(err => { * // Network or other error, handle appropriately * console.error(err); * return of({ error: true, message: err.message }) * }) * ); * * data$.subscribe({ * next: result => console.log(result), * complete: () => console.log('done') * }); * ``` * * ### Use with Chunked Transfer Encoding * * With HTTP responses that use [chunked transfer encoding](https://tools.ietf.org/html/rfc7230#section-3.3.1), * the promise returned by `fetch` will resolve as soon as the response's headers are * received. * * That means the `fromFetch` observable will emit a `Response` - and will * then complete - before the body is received. When one of the methods on the * `Response` - like `text()` or `json()` - is called, the returned promise will not * resolve until the entire body has been received. Unsubscribing from any observable * that uses the promise as an observable input will not abort the request. * * To facilitate aborting the retrieval of responses that use chunked transfer encoding, * a `selector` can be specified via the `init` parameter: * * ```ts * import { of } from 'rxjs'; * import { fromFetch } from 'rxjs/fetch'; * * const data$ = fromFetch('https://api.github.com/users?per_page=5', { * selector: response => response.json() * }); * * data$.subscribe({ * next: result => console.log(result), * complete: () => console.log('done') * }); * ``` * * @param input The resource you would like to fetch. Can be a url or a request object. * @param initWithSelector A configuration object for the fetch. * [See MDN for more details](https://developer.mozilla.org/en-US/docs/Web/API/WindowOrWorkerGlobalScope/fetch#Parameters) * @returns An Observable, that when subscribed to, performs an HTTP request using the native `fetch` * function. The {@link Subscription} is tied to an `AbortController` for the fetch. */ export function fromFetch( input: string | Request, initWithSelector: RequestInit & { selector?: (response: Response) => ObservableInput; } = {} ): Observable { const { selector, ...init } = initWithSelector; return new Observable((subscriber) => { // Our controller for aborting this fetch. // Any externally provided AbortSignal will have to call // abort on this controller when signaled, because the // signal from this controller is what is being passed to `fetch`. const controller = new AbortController(); const { signal } = controller; // This flag exists to make sure we don't `abort()` the fetch upon tearing down // this observable after emitting a Response. Aborting in such circumstances // would also abort subsequent methods - like `json()` - that could be called // on the Response. Consider: `fromFetch().pipe(take(1), mergeMap(res => res.json()))` let abortable = true; // If the user provided an init configuration object, // let's process it and chain our abort signals, if necessary. // If a signal is provided, just have it finalized. It's a cancellation token, basically. const { signal: outerSignal } = init; if (outerSignal) { if (outerSignal.aborted) { controller.abort(); } else { // We got an AbortSignal from the arguments passed into `fromFetch`. // We need to wire up our AbortController to abort when this signal aborts. const outerSignalHandler = () => { if (!signal.aborted) { controller.abort(); } }; outerSignal.addEventListener('abort', outerSignalHandler); subscriber.add(() => outerSignal.removeEventListener('abort', outerSignalHandler)); } } // The initialization object passed to `fetch` as the second // argument. This ferries in important information, including our // AbortSignal. Create a new init, so we don't accidentally mutate the // passed init, or reassign it. This is because the init passed in // is shared between each subscription to the result. const perSubscriberInit: RequestInit = { ...init, signal }; const handleError = (err: any) => { abortable = false; subscriber.error(err); }; fetch(input, perSubscriberInit) .then((response) => { if (selector) { // If we have a selector function, use it to project our response. // Note that any error that comes from our selector will be // sent to the promise `catch` below and handled. innerFrom(selector(response)).subscribe( createOperatorSubscriber( subscriber, // Values are passed through to the subscriber undefined, // The projected response is complete. () => { abortable = false; subscriber.complete(); }, handleError ) ); } else { abortable = false; subscriber.next(response); subscriber.complete(); } }) .catch(handleError); return () => { if (abortable) { controller.abort(); } }; }); }