RabbitMQ Streams

Dans RabbitMQ, les streams sont des queues conçues pour les objectifs suivants:

  • performances, avec un débit optimisé,

  • persistance, avec un stockage optimisé pour un grand nombre de messages et pour permettre le rejeu des messages,

  • mode mixte entre abonnement et consommation.

Avec ces caractéristiques, ça s’éloigne d’AMQP 0.9 et ça peut ressembler un peu à Kafka.

On peut utiliser des stream powered queues avec le protocole AMQP. Mais pour profiter du débit optimisé, il faut utiliser le protocole dédié, et pour ça il faut activer le plug-in et utiliser un client spécifique.

Stream powered queues

Une telle queue se construit de façon classique, avec un argument "x-queue-type"="stream" Ça impose quelques contraintes car une stream powered queue est forcément durable, non exclusive et sans suppression automatique.

  "queues":[
    {
      "name": "s.activity",
      "vhost": "/jtips",
      "durable": true,
      "auto_delete": false,
      "exclusive": false,
      "arguments": {
        "x-queue-type": "stream"
      }
    }
  ]

Ou avec le client AMQP

channel.queueDeclare(
    "s.activity",
    true,         // durable
    false, false, // not exclusive, not auto-delete
    Map.of("x-queue-type", "stream")
);

Cette queue peut être attachée à n’importe quel exchange, comme une queue classique. Ses messages peuvent aussi être consommés par n’importe quel client AMQP.

Stockage et rétention

Avec une queue classique, une fois qu’un message est consommé, il est supprimé du stockage. Avec une queue orientée stream, le message reste dans le stockage. Tous les messages, qu’ils soient consommés ou pas restent dans le stockage dans les limites de la configuration de rétention de la queue.

Sans configuration de rétention, le stockage peut grossir jusqu’à saturation du disque. On peut configurer

  • la taille maximale de la queue avec x-max-length-bytes,

  • l’age maximun des messages avec x-max-age,

  "queues":[
    {
      "name": "s.activity",
      "vhost": "/jtips",
      "durable": true,
      "auto_delete": false,
      "exclusive": false,
      "arguments": {
        "x-queue-type": "stream",
        "x-max-age": "3M",
        "x-max-length-bytes:": "100000000000"
      }
    }
  ]

Ou avec le client AMQP

channel.queueDeclare(
    "s.activity",
    true,         // durable
    false, false, // not exclusive, not auto-delete
    Map.of(
        "x-queue-type", "stream",
        "x-max-age", "6h",
        "x-max-length-bytes:", 10_000_000_000
    )
);

Ces paramètres de rétention sont les seuls utilisés. Ça signifie, qu’on ne peut pas choisir sur chaque message s’il est durable et son TTL, contrairement aux messages classiques.

Protocole et librairie cliente

Pour bénéficier d’un débit optimal, il faut accéder aux queues via le protocole de stream, au lieu d’AMQP. Au lieu de 5672, il utilise le port 5552, qui est ouvert et pris en charge par le plugin stream.

$> rabbitmq-plugins enable rabbitmq_stream

Et pour utiliser ce protocole, les clients doivent utiliser des librairies dédiées.

  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>stream-client</artifactId>
    <version>0.13.0</version>
  </dependency>

L’API est complètement différente de celle d’AMQP. On crée directement un stream, qui une sorte de queue, et il n’y a plus d'exchange.

Environment environment = Environment.builder().build();
environment.streamCreator()
    .stream("s.activity")
    .maxAge(Duration.ofHours(6))
    .maxLengthBytes(ByteCapacity.GB(10))
    .create();

On connecte les producteurs et consommateurs au stream.

Producer producer = environment.producerBuilder()
        .stream("s.activity")
        .build();

producer.messageBuilder()
        .addData(data.toJson().asBytes())
        .build()
Consumer consumer = environment.consumerBuilder()
        .stream("s.activity")
        .messageHandler((context, message) -> {
          ...
        })
        .build();

Le protocole a été conçu pour fonctionner en cluster, avec un noeud principal et des noeuds répliqués. Lorsqu’un producteur se connecte, il est rattaché au noeud principal, alors que les consommateurs sont connectés aux noeuds répliqués. Pour qu’il soit accessible, chaque noeud transmet son hostname. Comme souvent dans le cloud, ou avec des conteneurs, le nom utilisé peut être privé et ne permet pas d’établir la connexion. On peut contourner ce problème en configurant RabbitMQ ou en réécrivant les adresses coté client.

Coté broker, on peut configurer la propriété stream.advertised_host. Par exemple, en environnement de développement, j’utilise un noeud RabbitMQ dans un conteneur, avec la configuration ci-dessous.

# stream.conf
stream.advertised_host = localhost

La variable d’environnement RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS peut aussi être utilisée au lancement de RabbitMQ.

$> docker run --publish 5552:5552                                                               \
      --env RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost'    \
      rabbitmq:3.9

Coté client, on peut ajouter un AddressResolver à l’instance d'Environment.

Address endpoint = new Address("localhost", 5552);
environment = Environment.builder()
        .host(endpoint.host())
        .port(endpoint.port())
        .addressResolver(address -> endpoint)
        ...
        .build();

Rejeu et consommations multiples

Le rejeu (ou replay) est possible parce que les messages ne sont pas supprimés à la consommation. Quand un message est ajouté dans une queue, il est numéroté de façon séquentiel.

Au démarrage, le consommateur peut juste attendre le prochain message. Il peut aussi demander de démarrer à un index plus ancien, ou depuis le début.

Ce fonctionnement permet aussi à plusieurs consomateurs de se brancher sur la même queue et de lire les mêmes messages. Chaque consommateur peut s’attacher à n’importe quel offset.

Consumer consumer = environment.consumerBuilder()
        .name(clientId)
        .stream("s.activity")
        .offset(OffsetSpecification.last())
        .messageHandler((context, message) -> {
          ...
        })
        .build();
  • OffsetSpecification

    • none(): OffsetSpecification

    • first(): OffsetSpecification

    • offset(long offset): OffsetSpecification

    • timestamp(long timestamp): OffsetSpecification

    • last(): OffsetSpecification

    • next(): OffsetSpecification

Les messages consommés au démarrage dépendent de l'OffsetSpecification passé au builder.

  • none(): aucun message, c’est la valeur par défaut
    La consommation ressemble à celle d’un topic JMS. Au démarrage, aucun message n’est consommé et par la suite, tous les consommateurs reçoivent les messages.

  • first(): tous les messages stockés
    Le consommateur reçoit tous les messages depuis le plus ancien qui est stocké.

  • offset(…​): les messages à partir de l’offset passé en paramètre On choisit le n° du message à partir duquel on veut démarrer. L’offset du message consommé peut être lu sur le contexte.

Consumer consumer = environment.consumerBuilder()
        .offset(OffsetSpecification.offset(storedOffset))
        .messageHandler((context, message) -> {
              long offset = context.offset();
        })
        ...
  • timestamp(…​): les messages à partir du timestamp passé en paramètre
    On choisit le timestamp à partir duquel on veut démarrer.

Consumer consumer = environment.consumerBuilder()
        .offset(OffsetSpecification.timestamp(Instant.now().minus(1, ChronoUnit.HOURS).toEpochMilli()))
        ...
  • last(): le dernier message publié
    Le consommateur reçoit le dernier message enregistré, sauf s’il a déjà été consommé et que le broker a enregistré cette consommation. Cet enregistrement est lié au tracking, comme pour la spécification next().

  • next(): les messages à partir de l’offset enregistré par le broker
    La consommation ressemble à celle d’un topic JMS durable.
    L’enregistrement de l'offset dépend du tracking. Et sans tracking, next() est équivalent à none().

Avec le tracking, l'offset de chaque consommateur est sauvegardé par le broker. Ainsi lors d’un redémarrage, en spécification next(), le consommateur continue de consommer les messages là où il en était.

Le premier prérequis pour utiliser le tracking, c’est que le consommateur ait un nom, et qu’il porte le même nom au redémarrage. Ensuite, on peut utiliser du tracking automatique ou manuel. A partir du moment où il y a un nom, le tracking par défaut est automatique.

consumer = environment.consumerBuilder()
        .name(clientId)
        .stream("s.activity")
        .autoTrackingStrategy().builder()
        .offset(OffsetSpecification.next())
        ...

Le broker stocke ces informations dans la queue, au milieu des messages. De ce fait, stocker l’index à chaque message pourrait avoir un impact non négligeable sur le volume de stockage et sur les performances. La documentation de RabbitMQ préconise de le stocker tous les quelques milliers de messages. Par défaut, c’est tous les 10 000 messages, ou toutes les 5 secondes.

consumer = environment.consumerBuilder()
        .name(clientId)
        .stream("s.activity")
        .autoTrackingStrategy()
            .messageCountBeforeStorage(10_000)
            .flushInterval(Duration.ofSeconds(5))
            .builder()
        .offset(OffsetSpecification.next())
        ...

En tracking manuel, le stockage de l’offset se fait explicitement.

consumer = environment.consumerBuilder()
        .name(clientId)
        .stream("s.activity")
        .manualTrackingStrategy().builder()
        .offset(OffsetSpecification.next()
        .messageHandler((context, message) -> {
            context.storeOffset();
        })
        ...

Le choix du mode de démarrage est compatible avec un client AMQP. Par contre, le tracking ne l’est pas.

Pour ça, on peut lire le header x-stream-offset sur les messages reçu, et on doit ajouter un argument du même nom au consommateur.

channel.basicConsume(
        "s.activity", false,
        Map.of("x-stream-offset", lastOffset),
        (consumerTag, message) ->
            storeLocallyLastOffset(message.getProperties().getHeaders().get("x-stream-offset")),
        (consumerTag, message) -> {});

Les valeurs de l’argument ressemblent à celles de OffsetSpecification. On peut mettre une valeur textuelle :

  • "first" pour recevoir tous les messages,

  • "last" pour recevoir le dernier message, qu’il ait déjà été reçu ou pas puisqu’il n’y a pas de tracking,

  • "next" pour ne rien recevoir,

  • un intervalle, avec une valeur et une unité (Y, M, D, h, m, s) comme par exemple "6h".

Pour choisir l'offset de départ, on passe une valeur mumérique. Pour le timestamp, on passe une instance de Date.

Single Active Consumer

Plus haut, je disais que qu’un stream fonctionnait comme un topic JMS et que les messages étaient consommés par tous les messages. C’est généralement vrai, mais pas toujours.

Avec la fonctionnalité de Single Active Consumer, plusieurs consommateurs s’abonnent au même stream et un seul est actif. Il reçoit les messages alors que les autres sont inactifs et ne reçoivent rien. Un consommateur inactif devient actif lorsque son collègue précédemment actif s’arrête, ainsi ça permet une continuité dans la consommation.

Pour que ça fonctionne, tous les consommateurs doivent avoir le même nom et activer la fonctionnalité.

consumer = environment.consumerBuilder()
        .name(clientId)
        .stream("s.activity")
        .singleActiveConsumer()
        ...

Ce mode annule le choix la spécification d'offset. Au démarrage, le consommateur se comporte forcément comme avec next(), d’après mes essais.

D’autres consommateurs avec des noms différents recevront aussi le message. Ainsi on peut faire des grappes de single active consumers.

Avec Spring

Spring propose une librairie "Spring Cloud Stream" pour utiliser Kafka ou RabbitMQ comme brique d’intégration entre micro-services. Malgré son nom, par défaut elle fait de l’AMQP avec RabbitMQ, et le support de RabbitMQ stream n’est qu’une option secondaire.

Par ailleurs, dans Spring AMQP, il y a un RabbitStreamTemplate et un StreamListenerContainer.

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit-stream</artifactId>
  <version>3.0.9</version>
</dependency>

Synthèse

Les fonctionnalités suivantes sont accessibles aux clients AMQP et Stream, sans obligation d’avoir le plugin stream.

  • Création de stream powered queues

  • Distribution des messages à plusieurs consommateurs, à la façon des topics JMS (Large fan-outs)

  • Choix de la position de démarrage dans l’historique des messages (Replay, Time-travelling)

Les fonctionnalités suivantes ne sont accessibles qu’aux clients Stream et ont besoin du plugin stream.

  • Optimisation des performances, grâce au débit optimisé

  • Sauvegarde de la position de lecture (Tracking)

  • Single Active Consumer

  • Envoi direct de message, sans passer par un exchange