Messages asynchrones avec Spring Integration

Spring Integration supporte l’échange de messages asynchrone sans avoir besoin d’ajouter de librairie externe. Il supporte aussi bien le mode point-to-point que le mode publish-subscribe, avec plusieurs variantes.

Principaux concepts

Channel

Un channel est le canal de communication entre clients. Il y a principalement trois interfaces dans Spring Framework (module spring-messaging, pas besoin de Spring Integration).

MessageChannel sert à envoyer des messages.

  • MessageChannel

    • send(Message message): boolean

    • send(Message message, long timeout): boolean

MessageChannel channel = ...;
channel.send(new GenericMessage<>(content));

PollableChannel hérite de MessageChannel. En plus de l’envoi de message, elle sert à réceptionner des messages de façon bloquante.

  • PollableChannel
    MessageChannel

    • receive(): Message<?>

    • receive(long timeout): Message<?>

PollableChannel channel = ...;
Message message = channel.receive();

SubscribableChannel hérite de MessageChannel. En plus de l’envoi de message, elle sert à s’abonner à la réception des futurs messages.

  • SubscribableChannel
    MessageChannel

    • subscribe(MessageHandler handler): boolean

    • unsubscribe(MessageHandler handler): boolean

SubscribableChannel channel = ...;
MessageHandler handler = message -> ...
channel.subscribe(handler);

Message

L’interface définissant un message est dans Spring Framework. Elle est assez simple, avec des headers et un contenu de type libre.

  • Message<T>

    • getHeaders(): MessageHeaders

    • getPayload(): T

L’implémentation classique de Message<T> est GenericMessage<T>, immuable avec des headers et un contenu générique.

  • GenericMessage<T>
    Message<T>

    • GenericMessage(T payload)

    • GenericMessage(T payload, Map<String,Object> headers)**

    • getHeaders(): MessageHeaders

    • getPayload(): T

ErrorMessage est une sous-classe de c, et contient un Throwable en contenu. Ce type de message peut aussi contenir le message qui serait à l’origine de l’erreur.

  • GenericMessage<T>
    GenericMessage<Throwable>

    • ErrorMessage(…​)

    • getHeaders(): MessageHeaders

    • getPayload(): Throwable

    • getOriginalMessage(): Message<?>

Cette interface et ces deux classes sont dans Spring Framework.

Spring Integration propose aussi MutableMessage<T> qui, contrairement à GenericMessage<T> autorise la modification des headers. Cette classe doit être utilisée avec précaution car rien ne garantie qu’elle soit thread safe. Il propose aussi AdviceMessage<T> a un contenu générique, et le message d’entrée, et est utilisé avec de l’AOP.

Modèles de communication

Spring Framework n’a qu’une seule implémentation de canal, ExecutorSubscribableChannel, en mode publish-subscribe et qui implémente SubscribableChannel.

Spring Integration est beaucoup plus riche.

  • PublishSubscribeChannel en mode publish-subscribe, implémente SubscribableChannel

  • QueueChannel en mode point-to-point et FIFO, implémente PollableChannel

  • PriorityChannel en mode point-to-point avec un ordre de priorité des messages, hérite de QueueChannel, implémente PollableChannel

  • RendezvousChannel en mode point-to-point en bloquant le producteur, utilisable pour faire du request-reply, hérite de QueueChannel

  • DirectChannel en mode point-to-point, mais en implémentant SubscribableChannel,

  • ExecutorChannel en mode point-to-point, ressemble au DirectChannel mais avec un send(…​) non bloquant

Publish-subscribe

Dans ce mode, chaque message envoyé dans le canal peut être réceptionné par plusieurs consommateurs.

                                                      ┌────────────┐
┌───────────┐       ┌─────────────────────────┐   ┌──>| Consumer#1 |
| Publisher ├──────>| PublishSubscribeChannel ├───┼──>| Consumer#2 |
└───────────┘       └─────────────────────────┘   └──>| Consumer#3 |
                                                      └────────────┘
  • PublishSubscribeChannel
    SubscribableChannel

    • PublishSubscribeChannel()

    • PublishSubscribeChannel(boolean requireSubscribers)

    • PublishSubscribeChannel(Executor executor)

    • PublishSubscribeChannel(Executor executor, boolean requireSubscribers)
       

    • setApplySequence(boolean applySequence)

    • setErrorHandler(ErrorHandler errorHandler)

    • setIgnoreFailures(boolean ignoreFailures)

    • setMinSubscribers(int minSubscribers)
       

    • send(Message message): boolean

    • send(Message message, long timeout): boolean

    • subscribe(MessageHandler handler): boolean

    • unsubscribe(MessageHandler handler): boolean

Avec le constructeur par défaut, la consommation des messages se fait dans le même thread que leur envoi. Pour découpler la consommation de l’envoi, il faut associer un executor.

Le paramètre de constructeur requireSubscribers permet de vérifier si le channel a des consommateurs. Dans ce cas, un message envoyé à un canal sans consommateur va déclencher une exception.

MessageChannel channel = new PublishSubscribeChannel(true);
channel.send(new GenericMessage<>(content));

org.springframework.integration.MessageDispatchingException:
Dispatcher has no subscribers, failedMessage=GenericMessage
[payload=Hello,
 headers={id=c77a385e-a3fc-2874-3434-23c24c4892a2,
          timestamp=1648380606878}]

ExecutorSubscribableChannel est aussi utilisable, un peu plus simple. Il peut aussi fonctionner de façon synchrone ou avec un executor. Ça permet d’avoir un event bus dans Spring Framework, sans Spring Integration.

Point-to-point

Dans ce mode, chaque message envoyé dans le canal ne sera réceptionné que par un consommateur, même si plusieurs sont connectés. De plus, le message reste dans le canal tant qu’il n’est pas consommé.

                                           ┌────────────┐
┌───────────┐       ┌──────────────┐       | Consumer#1 |
| Publisher ├──────>| QueueChannel ├───┐   | Consumer#2 |
└───────────┘       └──────────────┘   └──>| Consumer#3 |
                                           └────────────┘

QueueChannel fonctionne avec une file de messages. Lorsqu’un producteur envoie un message, celui-ci est stocké dans la file jusqu’à ce qu’un consomateur vienne le récupérer. Par défaut, cette file n’est pas limité en taille.

Les messages sont mis à disposition des consommateurs dans leur ordre d’arrivée : c’est du FIFO.

  • QueueChannel
    PollableChannel

    • QueueChannel()

    • QueueChannel(int capacity)

    • QueueChannel(Queue<Message<?>> queue)
       

    • send(Message<?> message): boolean

    • send(Message<?> message, long timeout): boolean

    • purge(MessageSelector selector)

    • receive(): Message<?>

    • receive(long timeout): Message<?>

// Publisher
MessageChannel channel = new QueueChannel();
channel.send(new GenericMessage<>(content));
// Consumer
PollableChannel channel = ...; // find the channel
Message<?> message = channel.receive();

PriorityChannel fonctionne de façon similaire, avec un notion de priorité qui détermine l’ordre de mise à disposition. Par défaut, les messages sont triés par le header priority. De façon optionnelle, on peut utiliser un comparateur de messages.

  • PriorityChannel
    QueueChannel

    • PriorityChannel()

    • PriorityChannel(int capacity)

    • PriorityChannel(Comparator<Message<?>> comparator)

    • PriorityChannel(int capacity, Comparator<Message<?>> comparator)

// Publisher
MessageChannel channel = new QueueChannel();
channel.send(
      new GenericMessage<>(
            content,
            // Priority header
            Map.of(IntegrationMessageHeaderAccessor.PRIORITY, 1)
      )
);

Point-to-point avec abonnement

Dans le chapitre précédent, les consommateurs doivent prélever les messages un à un. Avec DirectChannel, les consommateurs s’abonnent à l’arrivée de nouveaux messages. L’API ressemble à celle du mode pub/sub, mais chaque message n’est délivré qu’à un seul consommateur.

  • DirectChannel
    SubscribableChannel

    • DirectChannel()

    • DirectChannel(LoadBalancingStrategy loadBalancingStrategy)

    • send(Message message): boolean

    • send(Message message, long timeout): boolean

    • subscribe(MessageHandler handler): boolean

    • unsubscribe(MessageHandler handler): boolean

// Consumer
SubscribableChannel channel = new DirectChannel();
channel.subscribe(message -> ...);
// Publisher
MessageChannel channel = ...; // find the channel
channel.send(new GenericMessage<>(content));

Si plusieurs consommateurs souscrivent aux messages d’un canal, chaque message n’est consommé que par un seul d’entre eux. Par défaut, la répartission des messages entre les consommateurs se fait en round robin. Une répartission différente peut être choisie en passant une LoadBalancingStrategy à la création du canal.

Par contre, le choix est limité puisque seul le round robin est disponible dans Spring Integration. Pour une autre stratégie, il faut implémenter soi-même l’interface.

  • LoadBalancingStrategy

    • getHandlerIterator(Message<?> message, Collection<MessageHandler> handlers): Iterator<MessageHandler>

ExecutorChannel est une variation sur le même thème où l’envoi des messages se fait de façon non bloquante, via les threads d’un executor.

  • ExecutorChannel
    SubscribableChannel

    • ExecutorChannel(Executor executor)

    • ExecutorChannel(Executor executor, LoadBalancingStrategy loadBalancingStrategy)

Requête / réponse

Le mode requête / réponse est un dérivé du point à point. Le message initial est envoyé puis consommé en point à point. Une fois le message envoyé, le producteur se met en attente d’un message de réponse sur une file temporaire. Et quand le consommateur reçoit le message, il le traite et envoie un message de réponse dans cette file temporaire.

                                           ┌────────────┐
┌───────────┐       ┌──────────────┐       | Consumer#1 |
| Publisher ├──────>| QueueChannel ├───┐   | Consumer#2 |
└───────────┘<┐     └──────────────┘   └──>| Consumer#3 |
              |                            └──────┬─────┘
              |    ┌───────────────────┐          |
              └────┤ RendezvousChannel |<─────────┘
                   └───────────────────┘

RendezvousChannel est approprié pour servir de file temporaire. Il est bloquant des deux cotés, y compris pour le producteur qui est mis en attente d’un consommateur.

// Publisher
PollableChannel replyChannel = new RendezvousChannel();
Message<?> message =
      new GenericMessage<>(
            content,
            Map.of(MessageHeaders.REPLY_CHANNEL, replyChannel)
      );
channel.send(message);
Message<?> resultMessage = replyChannel.receive();

On peut simplifier le code avec GenericMessagingTemplate.

GenericMessagingTemplate n’utilise pas de RendezvousChannel, mais un canal privé qui ne peut contenir qu’un seul message.

Annotations

Spring Integration propose un stéréotype de composant avec @MessageEndpoint.

@MessageEndpoint
public class EventEndpoint {
  // ...
}

@ServiceActivator

Cette annotation doit être apposée sur une méthode d’un bean, de préférence annotée avec @MessageEndpoint. Elle permet de s’abonner aux messages d’un canal, qu’il soit subscribable ou pollable.

@MessageEndpoint
public class EventEndpoint {
  @ServiceActivator(inputChannel = "channel/subscribable")
  public void onMessage(String message) {
    // ...
  }
}

Le paramètre d’entrée de la méthode peut être un Message ou directement le type du contenu, comme dans l’exemple ci-dessus. Dans le cas du contenu, on peut aussi ajouter des paramètres d’en-tête, annotés avec @Header, ou un paramètre pour l’ensemble des valeurs d’en-tête, de type Map<String, Object> et annoté avec @Headers.

@MessageEndpoint
public class EventEndpoint {
  @ServiceActivator(inputChannel = "channel/subscribable")
  public void onMessage(String message, @Header("token") String token) {
    // ...
  }
}

@Poller

Si le canal est pollable, il faut associé au poller. Il peut être défini sous forme d’un bean global.

  @Bean(name = PollerMetadata.DEFAULT_POLLER)
  public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setTrigger(new PeriodicTrigger(1, TimeUnit.SECONDS));
    return pollerMetadata;
  }

Dans l’exemple présenté ici, la récupération des messages se fait toutes les secondes. Les triggers sont ceux de Spring Task Executor.

Pour qu’une méthode utilise un poller alternatif, il faut utiliser l’annotation @Poller en passant le nom du bean.

@MessageEndpoint
public class EventEndpoint {
  @ServiceActivator(
      inputChannel = "channel/subscribable",
      poller = @Poller("sw.slowPoller"))
  public void onMessage(String message, @Header("token") String token) {
    // ...
  }
}

La même annotation peut être utilisée pour utiliser un poller interne.

@MessageEndpoint
public class EventEndpoint {
  @ServiceActivator(
      inputChannel = "channel/pollable",
      poller = @Poller(fixedDelay = "1000"))
  public void onMessage(String message, @Header("token") String token) {
    // ...
  }
}

@ServiceActivator avec retour

La méthode peut aussi retourner un message, ou un objet quelconque qui servira de contenu à un message. Ce message sera envoyé dans le canal spécifié comme outputChannel.

  @ServiceActivator(
      inputChannel = "channel/pollable",
      outputChannel = "channel/subscribable")
  public String onMessage(String message) {
    systemLogger.log(INFO, "Message received on channel/pollable: " + message);
    return "Reply-" + message;
  }

Si aucun outputChannel n’est pas spécifié, le retour est envoyé dans le replyChannel du message entrant. Et s’il n’y en a pas non plus, une exception est levée.

Si le message a un replyChannel et qu’un outputChannel est spécifié, alors c’est outputChannel qui est utilisé.

@Publisher

On peut aussi envoyer des messages via des méthodes annotées. Pour que ça fonctionne, il faut activer spécifiquement la fonctionnalité.

@Configuration
@EnableIntegration
@EnablePublisher
public class IntegrationConfiguration {
  // ...
}

Après ça, on peut annoter des méthodes de bean avec @Publisher. L’appel de ces méthodes déclenche l’envoi d’un message avec le résultat de la méthode en payload. Le nom passé en paramètre est le nom de bean du canal.

  @Publisher("channel/publish")
  public String publish(String message) {
    // ...
    return result;
  }

Comme pour les méthodes annotées avec @ServiceActivator, il est possible d’ajouter des paramètres d’en-tête annotés avec @Header.

  @Publisher("channel/publish")
  public String publish(String message, @Header("token") String token) {
    // ...
    return result;
  }

Concepts avancés

ChannelInterceptor

Comme le nom l’indique bien, un ChannelInterceptor est rattaché à un Channel. Il s’insère au niveaux des interactions entre le message et le canal, avant/après l’envoi et avant/après la réception.

  • ChannelInterceptor

    • preSend(Message<?> message, MessageChannel channel): Message<?>

    • postSend(Message<?> message, MessageChannel channel, boolean sent)

    • afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex)

    • preReceive(MessageChannel channel): boolean

    • postReceive(Message<?> message, MessageChannel channel): Message<?>

    • afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex)

Toutes les méthodes de l’interface sont default. On n’implémente que celles qu’on veut effectivement redéfinir.

@Component
public class SecurityInterceptor implements ChannelInterceptor {

  private final SecurityService securityService;

  public SecurityInterceptor(SecurityService securityService) {
    this.securityService = securityService;
  }

  @Override
  public Message<?> preSend(Message<?> message, MessageChannel channel) {
    String token = message.getHeaders().get("token", String.class);
    securityService.validateToken(token);
    return ChannelInterceptor.super.preSend(message, channel);
  }
}

L’intercepteur peut être associé à un ou plusieurs canaux. De même un canal peut avoir plusieurs intercepteurs.

  @Bean
  public PollableChannel pollableChannel(SecurityInterceptor securityInterceptor) {
    QueueChannel channel = new QueueChannel();
    channel.addInterceptor(securityInterceptor);
    return channel;
  }

Un intercepteur peut fonctionner de façon non intrusive, pour du logging par exemple. Il peut empêcher l’envoi du message, dans une logique de sécurité. Il peut aussi transformer un message avant l’envoi ou après la réception.

EIP patterns

Spring Integration implémente plein de patterns d’intégration. Ça fait plusieurs sujet à approfondir sur ce thème.