Opérateurs flat map dans RxJS

mergeMap, concatMap, switchMap, exhaustMap sont dans la famille des flat maps, dans le sens où il permettent la transformation des éléments de la source en observables, puis ils applatissent le résultat en enlevant une couche d’observable.

Pour illustrer les différences entre les quatre opérateurs, j’ai réutilisé le même exemple.

L’observable source produit un élément chaque seconde, pendant un peu plus de six secondes, soit six éléments.

export function source() {
  return interval(1000).pipe(
    takeUntil(timer(6100))
  );
}

Pour chaque élément, il est produit un sous-observable qui produit un élément tous les six dixièmes de seconde pendant deux secondes et demi, soit quatre éléments.

export function inner(prefix: any) {
  return interval(600).pipe(
    takeUntil(timer(2500)),
    map((item) => `${prefix}/${item}\`)
  );
}

Ces observables sont associés via les diffrents opérateurs détaillés ci-dessous.

concatMap

Pour chaque élément de l’observable source, on crée un nouvel observable.

Les sous-observables sont exécutés de façon séquentielle. Chaque nouveau sous-observable attend que le précédent soit terminé avec de démarrer.

--A--------B--------C-|-----------------

--O--1--2-|--

concatMap

--AO--A1--A2-BO--B1--B2-CO--C1--C2-|-

Exemple

source()
  .pipe(concatMap((item) => inner(item)))
  .subscribe((response) => console.log(response));

Résultat :

0/0, 0/1, 0/2, 0/3, END inner,
1/0, 1/1, 1/2, 1/3, END inner, END source,
2/0, 2/1, 2/2, 2/3, END inner,
3/0, 3/1, 3/2, 3/3, END inner,
4/0, 4/1, 4/2, 4/3, END inner,
5/0, 5/1, 5/2, 5/3, END inner

On a les 24 résultats attendus, dans l’ordre. Cet ordre a un prix, le traitement complet dûre plus longtemps que pour mergeMap.

mergeMap

Le fonctionnement est similaire à concatMap, mais sans attendre la complétion du précédent sous-observable.

Son ancien nom était flatMap.

--A--------B--------C-|---------------

--O--1--2-|--

mergeMap

--AO--A1--BO-A2-B1-CO-B2-C1---C2-|-

Exemple

source()
  .pipe(mergeMap((item) => inner(item)))
  .subscribe((response) => console.log(response));

Résultat :

0/0, 0/1, 1/0, 0/2, 1/1, 0/3, END inner,
2/0, 1/2, 2/1, 1/3, END inner,
3/0, 2/2, 3/1, 2/3, END inner,
4/0, 3/2, END source, 4/1, 3/3, END inner
5/0, 4/2, 5/1, 4/3, END inner,
5/2, 5/3, END inner

On a bien les 24 résultats attendus. On constate aussi qu’ils ne sont pas séquentiels.

switchMap

Pour chaque élément de l’observable source, on crée un nouvel observable. Le nouvel observable est interrompu à l’arrivée de chaque nouvel élément.

--A--------B--------C-|-------------

--O---1---2-|--

switchMap

--AO---A1--BO---B1--CO---C1---C2-|-

Exemple

source()
  .pipe(switchMap((item) => inner(item)))
  .subscribe((response) => console.log(response));

Résultat :

0/0, END inner,
1/0, END inner,
2/0, END inner,
3/0, END inner,
4/0, END inner, END source,
5/0, 5/1, 5/2, 5/3, END inner

Les premiers sous-observables produisent un élément puis sont interrompus par une nouvel élément. Seul le dernier sous-observable produit tous ses éléments.

exhaustMap

Pour chaque élément de l’observable source, on crée un nouvel observable, ou presque. Les éléments produits par la source sont ignorés tant que le sous-observable n’est pas terminé.

--A--------B--------C-|-------------

--O---1---2-|--

exhaustMap

--AO---A1---A2-------CO---C1---C2-|-

Exemple

source()
  .pipe(exhaustMap((item) => inner(item)))
  .subscribe((response) => console.log(response));

Résultat :

0/0, 0/1, 0/2, 0/3, END inner,
3/0, 3/1, 3/2, END source, 3/3, END inner

Le premier sous-observable produit ses éléments. Comme il n’a pas fini son travail à l’arrivée du deuxième élément de la source, celui-ci est ignoré. Quand le troisième arrive, il est pris en charge car le travail est terminé.