Code bloquant dans Vert.x

Le credo de Vert.X, c’est :

Don’t block the event loop

En clair, il ne faut pas appeler de méthode bloquante dans un verticle classique. Mais il propose quelques solutions pour exécuter quand même du code bloquant.

Code bloquant

Le problème, c’est qu’on trouve souvent du code bloquant dans des librairies tierces :

  • JDBC

  • I/O

Pour faire mes tests, j’ai juste utilisé Thread.sleep().

   private String sleep(long duration) {
       try {
           System.out.println(String.format("Start sleeping for %s ms", duration));
           Thread.sleep(duration);
           return "End sleeping";
       } catch (InterruptedException e) {
           e.printStackTrace();
           return "Problem while sleeping";
       }
   }

Délai

Il y a deux délais qui entrent en jeu.

  • max event loop execute time (ns) : c’est le délai principal ; si l’event loop n’a pas rendu la main après ce délai, une alerte est loguée,

  • blocked thread check period (ms) : c’est l’intervalle de vérification

Par défaut, le premier est à deux secondes et le deuxième à une seconde. Donc on est certain d’avoir une alerte si on dépasse les trois secondes de bloquage, et on peut éventuellement en avoir une entre deux et trois secondes.

Ces délais peuvent être modifiés au démarrage de Vert.X.

   public static void main(String[] args) {
       VertxOptions options = new VertxOptions();

       options.setBlockedThreadCheckInterval(500L);  // 0.5 s
       options.setMaxEventLoopExecuteTime(500_000L); // 0.5 s

       Vertx vertx = Vertx.vertx(options);
       ....
   }

La modification de l’intervalle de vérification est particulièrement utile en debug (cf. mon blog).

executeBlocking

La façon la plus simple d’exécuter du code bloquant est de l’enrober dans un vertx.executeBlocking(…​). Cette méthode prend en premier paramètre la lambda bloquante à exécuter. Le deuxième paramètre est un callback appelé lorsque la première lambda est terminée.

Pour marquer la fin du traitement, il faut appeler event.complete(). L’objet passé en paramètre est récupéré dans le result du callback.

   private void sleepBlocking(long duration) {
       vertx.executeBlocking(event -> event.complete(sleep(duration)),
                             event -> System.out.println(event.result().toString()));
   }

Cet appel peut se faire depuis n’importe quel verticle. La lambda est exécutée sur un worker-thread. Lorsqu’on appelle plusieurs executeBlocking(…​) à la suite, dans le même contexte, il sont exécutés sur le même thread, de façon séquentielle. En ajoutant false en 2° paramètre, on demande à ce que l’exécution ne respecte pas l’ordre, et dans ce cas les lambdas sont appelées en parallèle, sur des threads séparés.

Attention quand même à la durée de blocage. Le temps d’exécution d’une tâche doit être inférieur au paramètre maxWorkerExecuteTime qui vaut une minute par défaut. Et ça aussi, ça peut se configurer.

   public static void main(String[] args) {
       ....
       options.setMaxEventLoopExecuteTime(500_000L); // 0.5 s
       ....
   }

La vérification de durée de traitement est faite par le BlockedThreadChecker, comme pour l'event loop. Elle est aussi sensible à la modification de son intervalle de vérification.

Worker Verticle

Un worker verticle est un verticle qui fonctionne sur son propre thread, issu du worker thread pool.

   public static void main(String[] args) {
       ...
       vertx.deployVerticle(new DatabaseVerticle(),
                            new DeploymentOptions().setWorker(true));
       ...
   }

Threads Pool

Les threads utilisés font partie d’un pool créé au démarrage de Vert.X. La taille du pool est fixée au démarrage avec options.setWorkerPoolSize(…​) ; la taille par défaut est 20.

On peut aussi faire travailler un pool de threads spécifique. On instancie un WorkerExecutor pour ça, et on c’est lui qui fera l'`executeBlocking(…​)`.

   private void sleepBlocking(long duration) {
       WorkerExecutor executor = vertx.createSharedWorkerExecutor("toto");
       executor.executeBlocking(
               event -> event.complete(sleep(duration)),
               event -> System.out.println(event.result().toString()));
   }

Attention, ça ne marche pas si on est déjà sur un worker thread. Par exemple, si on est dans un worker verticle, on restera sur le pool par défaut.