Principaux opérateurs de RxJS

Création d’un observable

Hors opérateurs

Dans Angular, les observables viennent souvent du client HTTP.

const product$ = this.httpClient.get<Product>(
    ${API_v1_URL}/product/${id}
  );

Ils sont parfois créés sous la forme de Subject, pour pouvoir appeler la méthode next plus tard. La classe Subject<?> hérite de la classe Observable<?>.

const reload$ = new Subject<boolean>();
...
reload$.next(true);

from

from crée un observable à partir d’un table

from([1, 2, 3, 4, 5])

--1---2---3---4---5--|--

from([1, 2, 3, 4, 5])
    .subscribe(val => console.log(val));

of

of crée un observable à partir d’une série de valeurs

of(1, 2, 3, 4, 5)

--1---2---3---4---5--|--

of(1, 2, 3, 4, 5)
    .subscribe(val => console.log(val));

ajax

ajax envoie une requête HTTP. C’est ce qui se rapproche le plus de ce qu’on fait avec Angular.

ajax('https://gitlab.com/api/v4/projects')
    .subscribe(val => console.log(val));

Filtrage

Les opérateur de filtre sont importés de rxjs/operators doivent être appelé dans pipe(…​). Ils réduisent le nombre d’éléments, mais n’arrêtent pas l’observable.

filter

filter exlut les éléments qui ne satisfont pas au prédicat.

--1---2---3---4---5--|--

filter(val ⇒ val % 2 === 0)

--------2---------4--------|--

of(1, 2, 3, 4, 5)
  .pipe(filter((val) => val % 2 === 0))
  .subscribe((val) => console.log(val));

ignoreElements serait une version extrème de filter puisqu’il exclut tous les éléments. On l’utilise quand on ne veut s’intéresser qu’à l’arrêt, normal ou en erreur.

--1---2---3---4---5--|--

ignoreElements()

-------------------------------|--

of(1, 2, 3, 4, 5)
  .pipe(ignoreElements())
  .subscribe({
    next: (val) => console.log('next should never happen', val),
    complete: () => console.log('complete'),
  });

skip, skipUntil, skipWhile

skip ignore les premiers éléments émits. C’est le contraire de take. skipUntil est le contraire de takeUntil, il ignore tous les éléments tant que l’observable secondaire n’a rien émit. Enfin skipWhile est le contraire de takeWhile, il ignore tous les éléments tant que l’observable secondaire émet.

--1---2---3---4---5--|--

skip(2)

--------------3---4---5--|--

of(1, 2, 3, 4, 5)
  .pipe(skip(2)
  .subscribe((val) => console.log(val));

debounce, debounceTime

Avec debounceTime, l’observable élimine les éléments qui arrivent trop vite. Le compteur de temps est remis à zéro à chaque élément, le suivant n’est émis que s’il arrive après le délai spécifié. Ça ressemble un peu à un timeout à l’envers. Avec debounce, la durée est contrôlée par un observable secondaire, ce qui lui permet de varier dans le temps.

--1------2-3--4--------5--|-

debounceTime(100)

--------1----------------4------|-

interval(100)
  .pipe(
    concatMap(() => timer(Math.random() * 1000)),
    take(20),
    debounceTime(500)
  )
  .subscribe((val) => console.log('ok'));

timeout

Avec timeout, si la source n’émet aucun élément pendant le délai, le flux est arrêté en erreur.

--1-2-3---------5-----|-

timeout(200)

--1-2-3------X-----------

Pour faire un timeout sans erreur, on peut l’associer à un catchError.

interval(100)
  .pipe(
    concatMap(() => timer(Math.random() * 1000)),
    take(20),
    timeout(300),
    catchError((error) => of(error.message))
  )
  .subscribe({
    next: (val) => console.log('timeout', val),
  });

distinct

distinct exclut les doublons, avec la possibilité de passer une fonction d’extraction de clé de comparaison.

--1--2--3--1--1--4--|-

distinct()

--1--2--3------------4--|-

distinctUntilChanged et distinctUntilKeyChanged éliminent aussi les doublons, mais uniquement s’ils se suivent.

--1--2--3--1--1--4--|-

distinctUntilChanged()

--1--2--3--1-------4--|-

of(1, 2, 3, 2, 3, 4)
  .pipe(distinct())
  .subscribe((val) => console.log(val));

Transformation

Les opérateur de transformation sont importés de rxjs/operators doivent être appelé dans pipe(…​).

map

map permet la transformation des éléments de la source en éléments d’un autre type.

--1--2--3--1--1--4--|-

map(x ⇒ x*2)

--2--4--6--2--2--8--|-

of(1, 2, 3, 4)
  .pipe(map(val => val * 2))
  .subscribe((val) => console.log(val));

flatMap

mergeMap, concatMap, switchMap, exhaustMap sont dans la famille des flat maps, dans le sens où il permettent la transformation des éléments de la source en observables, pour ils applatissent le résultat en enlevant une couche d’observable.

Combinaison

Les opérateur de combinaisons simples sont appelés de façon statique, avec un ensemble d’observables en paramètre. Les opérateur xxxAll() sont utilisés sur une instance dans pipe(…​).

Combinaisons statiques

concat exécute plusieurs observables en séquence.

--1--2--3--|-

--A--B--C--|-

--X--Y--|------

concat

--1--2--3--A--B--C--X--Y--|-

concat(
    of(1, 2, 3),
    of('A', 'B', 'C'),
    of('X', 'Y')
  )
  .subscribe((val) => console.log(val));

zip exécute plusieurs observables de façon concurrente, attend que tous aient produit un élément et les combine. L’observable combiné est arrêté lorsque le premier sous-observable s’arrête.

--1--2--3--|--

---A--B--C--|-

--X----Y--|-----

zip

---1AX---2BY--|-----

zip(
    of(1, 2, 3),
    of('A', 'B', 'C'),
    of('X', 'Y')
  )
  .subscribe((val) => console.log(val));

forkJoin exécute aussi plusieurs observables de façon concurrente, attend que tous soient terminés et produit un résultat avec le dernier élément de chaque observable. Il ressemble à zip lorsque tous les sous-observables ne produisent qu’un élément.

--1--2--3--|-

--A--B--C--|-

--X--Y--|-

forkJoin

-----------------3--C--Y--|-

forkJoin([
    of(1, 2, 3),
    of('A', 'B', 'C'),
    of('X', 'Y')
  ])
  .subscribe((val) =>
    console.log('forkJoin', val)
  );

L’opérateur merge exécute plusieurs observables de façon concurrente, mais sans combiner les éléments. Il les restitue simplement dans l’ordre d’arrivée.

--1--------2----3-|-----------

----A--B--C-|-----------------

---X------------Y-|-------------

merge

--1-X-A-B-2-C-Y-3-|-

merge(
    of(1, 2, 3),
    of('A', 'B', 'C'),
    of('X', 'Y'))
  .subscribe((val) => console.log(val)
);

Avec l’opérateur race seul l’observable qui produit son premier élément est utilisé. Les autres observables sont stoppés.

--1--------2----3-|-

----A--B--C-|-------

---X------------Y-|---

race

--1--------2----3-|-

race(
    of(1, 2, 3),
    of('A', 'B', 'C'),
    of('X', 'Y'))
  .subscribe((val) => console.log(val)
);

Combinaisons avec pipe(…​)

Avec withLatestFrom, chaque élément de l’observable principal est combiné avec le dernier élément produit par l’observable secondaire. Ainsi, un élément de l’observable secondaire peut apparaître plusieurs fois dans le résultat. combineAll fonctionne comme withLatestFrom avec un ensemble de sous-observables.

--1--2-3----4-|-

----A------B--C-|-

withLatestFrom

-------A2-A3----C4-|-

of(1, 2, 3)
  .pipe(withLatestFrom(of('A', 'B', 'C')))
  .subscribe((val) => console.log(val));

concatAll fonctionne comme concat. mergeAll fonctionne comme merge.

Arrêt

Les opérateurs take, takeUntil et takeWhile permettent d’interrompre l’émission d’éléments d’un observable. Ils sont importés de rxjs/operators doivent être appelé dans pipe(…​).

Avec take, on spécifie le nombre d’éléments au bout duquel l’observable arrête d’émettre. Il fonctionne à l’opposé de skip.

interval(100)
  .pipe(take(5))
  .subscribe((val) => console.log(val));

Avec takeUntil, l’observable principal arrête d’émettre lorsque l’observable secondaire commence. Ça peut être un timer, un sujet déclenché manuellement ou une autre source d’événements.

--1---2---3---4---5-|--

------------------X-|----------

takeUntil

--1---2---3--|-------------

takeWhile fonctionne avec un prédicat. Au premier élément pour lequel il n’est plus satisfait, l’observable s’arrête.

Avec first, l’observable est arrêté au premier élément, s’il n’y a pas de prédicat, ou le premier qui satisfait le prédicat. first() sans paramètre ressemble à take(1), à ceci près que first() sort en erreur s’il n’y a aucun élément. L’opérateur single a aussi des ressemblances avec first, mais il sort aussi en erreur s’il y a plusieurs éléments qui satisfont au prédicat. Enfin, find ressemble aussi à first(…​) avec prédicat.

--1---2---3---4---5-|--

first()

--1-|--------------------------

of(1, 2, 3)
  .pipe(single())
  .subscribe({
    error: (err) => console.error('single', err),
  });