Framework Executor du JDK

Le rôle principal du framework executor est d’ajouter une couche d’abstraction au dessus de la classe Thread.

Il est constitué d’un ensemble de classes et d’interfaces qui gèrent des pools de threads et l’exécution de tâches en parallèle. Elles sont rassemblées dans le paquetage java.util.concurrent.

Il permet aussi de planifier des tâches récurrentes avec ScheduledExecutorService. Comme quoi, il n’y a pas besoin de Quartz ou de Spring Task Scheduler pour couvrir des besoins simples.

Classes et interfaces qui héritent de Executor

Interfaces

L’interface Executor est au sommet de la hiérarchie. Elle définit juste une méthode pour exécuter une tâche Runnable. Elle n’a pas beaucoup d’intérêt toute seule, et il n’y a pas d’implémentation directe dans le JDK.

  • Executor

    • execute(Runnable command)

L’interface ExecutorService définit une façon de soumettre des tâches, puis de récupérer leurs résultats sour forme de Future<?>. Elle a aussi des méthodes pour arrêter le pool de threads.

  • ExecutorService

    • submit(Callable<T> task) : Future<T>

    • invokeAll(Collection<Callable<T>> tasks) : List<Future<T>>

    • shutdown()

L’interface ScheduledExecutorService permet de planifier des tâches répétées.

  • ScheduledExecutorService

    • schedule(Callable<V> callable, long delay, TimeUnit unit) : ScheduledFuture<V>

    • schedule(Runnable command, long delay, TimeUnit unit) : ScheduledFuture<?>

    • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) : ScheduledFuture<?>

    • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) : ScheduledFuture<?>

Pool de threads

La classe ThreadPoolExecutor implémente ExecutorService et permet d’exécuter des tâches asynchrones sur pool de threads. La configuration de ce pool est à plusieurs dimensions, permettant des comportements variés.

Soumettre une tâche

Une tâche est une implémentation de Runnable ou Callable<?>, sous forme de classe ou d’expression lambda.

Une tâche est soumise au pool par la méthode submit(…​), si on veut récupérer le résultat de la tâche via un Future<?>, ou la méthode execute(…​), sans résultat.

  Future<Instant> future = executor.submit(() -> someTask());
  ...
  Instant result = future.get();

Soumettre plusieurs tâches

En soumettant un ensemble de tâches, on récupère une liste de Future<?>, qui permettent de récupérer leurs résultats.

  List<Future<Instant>> futures = executor.invokeAll(
    List.of(
      () -> firstTask(),
      () -> secondTask(),
      () -> otherTask()
    )
  );

La méthode invokeAny(…​) permet de soumettre un ensemble de tâche et de récupérer le résultat de la tâche qui finit en premier. Les autres tâches sont annulées.

  Instant firstFinished = executor.invokeAny(
    List.of(
      () -> firstTask(),
      () -> secondTask(),
      () -> otherTask()
    )
  );

Instancier un ThreadPoolExecutor

On peut instancier directement le ThreadPoolExecutor, ou passer par les méthodes de fabrique d'Executors. On passe des paramètres de dimensionnement et de file d’attente au constructeur.

var executor = new ThreadPoolExecutor(
                        10, 20,
                        10, TimeUnit.SECONDS,
                        new LinkedBlockingQueue<Runnable>());

Dans l’ordre, les paramètres sont les suivants :

  • corePoolSize = taille du pool principal

  • maximumPoolSize = taille maximale du pool

  • keepAliveTime et unit = durée d’inactivité d’un thread avant son éviction

  • workQueue = file d’attente des tâches

En jouant avec ces paramètres, on a une gamme de comportements variés, avec ou sans pool secondaire, sans file d’attente, avec une file d’attente limitée ou illimitée. Les méthodes de création de la classe Executors permettent d’instancier des ThreadPoolExecutor avec des configurations prédéfinies.

  • Executors

    • newSingleThreadExecutor(): ExecutorService

    • newFixedThreadPool(int nThreads): ExecutorService

    • newWorkStealingPool(int parallelism): ExecutorService

    • unconfigurableExecutorService(ExecutorService executor): ExecutorService

newFixedThreadPool(n) permet de créer une pool à taille fixe. En réalité, le pool démarre avec 0 thread et voit sa taille augmenter selon le besoin. Tous les threads sont dans le pool principal, et y restent, même inutilisés. Lorsque le pool est plein, les nouvelles tâches sont mises en attente dans une file illimitée.

ExecutorService pool = Executors.newFixedThreadPool(42);
// Equivalent à
ExecutorService pool = new ThreadPoolExecutor(
                              42, 42,
                              0L, TimeUnit.MILLISECONDS,
                              new LinkedBlockingQueue<Runnable>());

Avec un tel pool, les tâches soumises sont réparties sur les différents threads tant qu’il en reste à disposition. S’ils sont tous occupés, les tâches sont mises en attente jusqu’à ce qu’un thread se libère.

newSingleThreadExecutor() fait la même chose, avec un seul thread. De plus, le pool créé ainsi n’est pas reconfigurable, contrairement à un pool construit avec newFixedThreadPool(1), dont on peut changer la taille ultérieurement.

ExecutorService pool = Executors.newSingleThreadExecutor();

newCachedThreadPool() permet de créer une pool à taille illimitée (ou presque). Les threads sont créé au fil de la demande et sont détruits après une inactivité d’une minute.

ExecutorService pool = Executors.newCachedThreadPool();
// Equivalent à
ExecutorService pool = new ThreadPoolExecutor(
                              0, Integer.MAX_VALUE,
                              60L, TimeUnit.SECONDS,
                              new SynchronousQueue<Runnable>());

Avec ce pool, les threads sont créés au fil du besoin. Lorsqu’on soumet une tâche, un thread existant lui est affecté et si aucun n’est disponible, un nouveau thread est créé et ajouté au thread.

Fonctionnement du pool

Quand une tâche est soumise,

  • si un thread est disponible, il est utilisé,

  • sinon, si corePoolSize n’est pas atteint, un nouveau thread est démarré pour exécuter la tâche,

  • sinon, s’il y a de la place dans la file, la taĉhe y est mise en attente,

  • sinon, si maximumPoolSize n’est pas atteint, un nouveau thread est démarré pour exécuter la tâche,

  • sinon la tâche est rejetée.

Il y a essentiellement deux types de files d’attente :

  • Avec SynchronousQueue, il n’y a pas de fil d’attente, la file n’a aucune capacité. Celui qui veut insérer est mis en attente d’une demande de retrait.

  • Avec LinkedBlockingQueue, on a une vraie file d’attente, en FIFO. Par défaut la capacité est (quasi-)infinie, mais peut être réduite.

Arrêter le pool et/ou les tâches

Le pool est arrêté avec la méthode shutdown() ou shutdownNow().

L’appel de shutdown() n’a pas d’impact sur les tâches déjà soumises, elle sont exécutées normalement. Par contre, la soumission de nouvelles tâches échoue avec une RejectedExecutionException.

On peut savoir que toutes les tâches sont terminées avec la méthode isTerminated(). On peut aussi attendre avec awaitTermination(…​).

  • ExecutorService

    • shutdown()

    • shutdownNow(): List<Runnable>

    • awaitTermination(long timeout, TimeUnit unit): boolean

    • isShutdown(): boolean

    • isTerminated(): boolean

Chaque tâche peut être interrompue individuellement grâce à la méthode cancel() de Future<?>. Si on passe false, ça n’annulera que les tâches qui n’ont pas démarré. Si on passe true, la méthode essaie d’interrompre la tâche, via la méthode interrupt() du thread.

  • Future<V>

    • cancel(boolean mayInterruptIfRunning)(): boolean

    • isCancelled(): boolean

Fork / Join

Comme ThreadPoolExecutor, la classe ForkJoinPool implémente ExecutorService. Mais son rôle est conceptuellement très différent, il sert à exécuter des tâches qui se scindent puis fusionnent pour récolter le résultat. Si la tâche est assez simple, elle est exécutée, sinon elle est scindée en sous-tâches.

Les exemples les plus couramment rencontrés sur le Web sont les suivants :

  • Calcul sur un gros tableau de nombres (min, max, somme,…​)

  • Parcour recursif de répertoires

  • Aspiration de site Web

Soumettre une tâche

  • ForkJoinPool

    • execute(ForkJoinTask<?> task)

    • invoke(ForkJoinTask<T> task): T

    • submit(ForkJoinTask<T> task): ForkJoinTask<T>

Les méthodes execute(…​) et invoke(…​) sont synchrones. La première ne retourne rien alors que la seconde retourne le résultat de la tâche. Toutes deux fonctionnent avec une tâche de type ForkJoinTask<T>.

La méthode submit(…​) renvoie la tâche elle-même qui implémente Future<T>, qui permet de récupérer le résultat plus tard.

  • ForkJoinTask<V>

    • fork(): ForkJoinTask<V>

    • join(): V

    • exec(): boolean

    • getRawResult(): V

    • setRawResult(V value)

Scinder la tâche et collecter le résultat

La méthode fork() permet d’ajouter la tâche à la file d’attente. Elle ne doit normalement être appelée que depuis une tâche parente en cours d’exécution sur le pool. C’est cette méthode qu’on appelle sur chaque sous-tâche lors d’une scission. A la place, on peut aussi appeler ForkJoinTask.invokeAll(…​) en passant les sous-tâches en paramètre.

La méthode join() est appelée sur un tâche pour récupérer le résultat et le combiner à celui des autres sous-tâches. Elle ressemble au get() de Future<?>.

Implémenter une tâche

Pour utiliser un ForkJoinPool, on doit implémenter le traitement dans une classe qui hérite de ForkJoinTask. On hérite rarement directement de ForkJoinTask, mais d’une de ses sous-classes.

  • RecursiveTask<V>
    ForkJoinTask<V>

    • compute(): V

  • RecursiveAction
    ForkJoinTask<Void>

    • compute()

  • CountedCompleter
    ForkJoinTask<V>

    • compute()

  • RecursiveTask<V> est une tâche recursive avec retour.

  • RecursiveAction est une tâche recursive sans retour.

  • CountedCompleter est le dernier arrivé (JDK 8) et est un peu plus complexe à coder. Il a sa propre page.

public BigTask extends RecursiveTask<Result> {
  public Result compute() {
    if (simple) {
      return doTheJob();
    } else {
      var subTask1 = new CustomTask();
      subTask1.fork();

      var subTask2 = new CustomTask();
      subTask2.fork();

      return subTask1.join().combine(subTask2.join());
    }
  }
}

La tâche initiale est soumise directement au pool. Ça peut être fait de façon synchrone.

    ForkJoinPool pool = new ForkJoinPool();
    Result result = pool.invoke(new BigTask(initialData));
    ...

Ça peut aussi être fait de façon asynchrone.

    ForkJoinPool pool = new ForkJoinPool();
    ForkJoinTask<Result> future = pool.submit(new BigTask(initialData));
    ...
    Result result = future.join();

Planification de tâches

La classe ScheduledThreadPoolExecutor, qui implémente l’interface ScheduledExecutorService permet de planifier des tâches répétées, avec choix de la taille du pool de threads.

Instancier et configurer le scheduler

Comme pour ThreadPoolExecutor, on peut instancier directement ScheduledThreadPoolExecutor ou passer par les fabriques de Executors. Par contre, la configuration du pool est plus simple que celui du ThreadPoolExecutor.

Même si ScheduledThreadPoolExecutor hérite de ThreadPoolExecutor, elle n’en exploite pas les possibilités de pool. Elle utilise juste un pool de taille fixe avec une fille d’attente illimitée.

  ScheduledThreadPoolExecutor scheduler
        = new ScheduledThreadPoolExecutor(5);
  • Executors

    • newSingleThreadScheduledExecutor(): ScheduledExecutorService

    • newScheduledThreadPool(int corePoolSize): ScheduledExecutorService

newScheduledThreadPool(n) permet de créer un pool de taille fixe.

  ScheduledExecutorService scheduler
        = Executors.newScheduledThreadPool(42);

newSingleThreadScheduledExecutor() permet d’utiliser un thread unique, sans possibilité de reconfiguration.

  ScheduledExecutorService scheduler
        = Executors.newSingleThreadScheduledExecutor();

Planifier une tâche

Avec la méthode schedule(…​), on planifie une tâche à exécution ultérieure. Cette tâche s’exécutera une seule fois.

  ScheduledFuture<Result> future
        = scheduler.schedule(..., 3, TimeUnit.MINUTES);

Avec la méthode scheduleAtFixedRate(…​), on planifie une tâche qui va s’exécuter plusieurs fois, à intervalle régulier. L’intervalle est calculé entre le démarrage d’une tâche et le démarrage de la tâche précédente. Lorsqu’on programme la tâche, on choisit dans quel délai elle s’exécutera pour la première fois.

  ScheduledFuture<Result> future
        = scheduler.scheduleAtFixedRate(..., 10, 3, TimeUnit.MINUTES);

Avec la méthode scheduleWithFixedDelay(…​), l’intervalle est calculé entre le démarrage d’une tâche et la fin de la tâche précédente.

  ScheduledFuture<Result> future
        = scheduler.scheduleWithFixedDelay(..., 10, 3, TimeUnit.MINUTES);
Le respect des intervalles et délais n’est pas garanti. En effet, l’exécution des tâches dépend de la disponibilité de threads dans le pool.

Toutes ces méthodes renvoient un ScheduledFuture<?>, qui hérite de Future<?> et permet donc de récupérer le résultat ultérieurement. Il hérite aussi de Delayed qui donne le délai avant la prochaine exécution.

  • ScheduledFuture<V>

    • getDelay(TimeUnit unit): long

    • get(): V

Arrêter le scheduler et/ou les tâches

Le scheduler s’arrête avec les méthodes shutdown() ou shutdownNow(). Une question supplémentaire se pose par rapport à un pool simple, concernant les tâches qui sont planifiées à plus tard ainsi que les tâches périodiques.

  • ScheduledExecutorService

    • setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)

    • setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)
       

    • shutdown()

    • shutdownNow(): List<Runnable>

    • awaitTermination(long timeout, TimeUnit unit): boolean

    • isShutdown(): boolean

    • isTerminated(): boolean

Par défaut, les tâches ultérieures sont conservées après un shutdown(). En revanche, les tâches périodiques sont déprogrammées.

Quels que soient ces paramètres, un appel de shutdownNow() arrête toutes les tâches.