Mais de quoi va-t-on parler ?
De Java. Oui je préfère le dire dès le début. Cet article va parler de Java !
Les gatherers
sont le premier ajout d'importance à l’API java.util.Streams
depuis sa sortie, et on parle de 2014. Cela signifie qu’il ne s’était pas passé grand-chose depuis environ longtemps.
Arrivés en preview en Java 22, il sont finalement standards en Java 24.
Table des matières
Remise à niveau
Même si on ne va pas faire une revue complète de l'API java.util.Streams
, je vous propose de commencer par un petit quizz, tranquilles, posés.
Quiz 1/3
1 public class Quizz {
2 public static void main(String[] args) {
3 getPeople()
4 .map(person -> person.getName())
5 .toList();
6 }
7 }
Admettons que la méthode getPeople
renvoie une instance de Stream
de 10 objets de type Person
.
Ici, le code est simple, on applique la méthode map
qui prend en paramètre une Function
, qui extraie une String
à partir d'une instance de Person
en renvoyant le résultat de la méthode Person.getName()
.
La question est :
Combien de fois la ligne 4 est-elle invoquée ?
① 1 fois ?
② 10 fois ?
③ Non Jérôme, elle n'est pas invoquée la ligne 4.
④ It depends!
spoiler
Vous êtes-vous dit qu’il y avait plusieurs bonnes réponses ?
La bonne réponse est:
④ It depends!
En réalité, la question était mal posée. La question, aurait du être :
"Est-ce que la méthode map
est invoquée une fois, 10 fois, ainsi de suite ?" ou "Est-ce que la fonction qui est passée en paramètre de la méthode map
est invoquée ?"
La méthode map
, est bien appelée une seule fois, par contre, la fonction qui lui est passée en paramètre, elle, est bien invoquée 10 fois.
Il faut systématiquement différencier les méthodes de l’API Stream, qui ne font que de la configuration de pipeline, et les fonctions/predicats/... qu’on leur passe.
Quiz 2/3
Bon, ok, on a compris. À partir de maintenant, la question portera systématiquement sur le nombre d’invocations de la Function
passée en paramètre de la méthode map
.
1 public class Quizz {
2 public static void main(String[] args) {
3 getPeople()
4 .map(person -> person.getName());
5 }
6 }
La question est :
Combien de fois la ligne 4 est-elle invoquée ?
① 1 fois ?
② 10 fois ?
③ Non Jérôme, elle n'est pas invoquée la ligne 4.
spoiler
La bonne réponse est :
③ Non Jérôme, elle n’est pas invoquée la ligne 4.
Dans un Stream
, il y a deux types d’opérations :
- Des opérations intermédiaires.
- Des opérations finales.
Les opérations intermédiaires ne font que configurer un pipeline d’exécution, elles ne déclenchent rien. Tant qu’on n’a pas appelé une méthode finale sur un stream
, il ne se passe rien du tout.
Quiz 3/3
1 public class Quiz {
2 public static void main(String[] args) {
3 getPeople()
4 .map(person -> person.getName())
5 .count();
6 }
7 }
La question est :
Combien de fois la ligne 4 est-elle invoquée ?
① 1 fois ?
② 10 fois ?
③ Non Jérôme, elle n'est pas invoquée la ligne 4.
spoiler
La bonne réponse est :
③ Non Jérôme, elle n’est pas invoquée la ligne 4.
Eeeeeeet oui, l’API stream
est intelligente, et parmi les opérations terminales, il y en a certaines qui possèdent des raccourcis. Et c'est le cas de la méthode count
qui est capable d’évaluer si toutes les opérations qui ont été exécutées avant elle peuvent avoir un impact sur la cardinalité de ce qu’il y a en sortie.
Ici, une seule opération map
n’aura aucun impact sur la cardinalité, et donc, inutile de l’invoquer, ça n’a aucun intérêt. On n’a pas besoin de transformer des personnes en chaînes de caractères pour savoir qu’il y en à 10.
C'est assez important de comprendre que vous n'avez pas de garantie d'invocation. Et si, par exemple, vous faites partie de la team peek
-
💀 ARRÊTEZ-CA MAINTENANT 💀-, et que vous utilisez un count
, votre peek
risquerait bien de ne jamais être invoqué.
Rappels
- Il y a des méthodes intermédiaires :
map
,filter
, etc. -
Il y a des méthodes terminales :
anyMatch
,toList
,count
, etc.L’invocation d’une opération terminale est le seul déclencheur de l’utilisation d’un pipeline. Je peux enchaîner autant d’opérations intermédiaires que je veux, il ne se passera jamais rien.
Et dès que j’appelle une opération terminale, je consomme mon stream, et je ne peux plus rien en faire. Et évidemment, un peu tautologique, je ne peux pas avoir deux méthodes terminales qui s’enchaînent.
Certaines opérations terminales possèdent des courts-circuits.
On peut implémenter un nombre infini ♾️ d'opérations terminales grâce à l'API Collector.
Il existe un nombre fini d'opérations intermédiaires fournies pas l'API
Évidemment, l’API gatherer est là pour rattraper cette terrible injustice et nous laisser la capacité de coder toutes les opérations intermédiaires que l’on veut.
Collectors vs Gatherers
Nous sommes habitués à l'API Collectors
, mais tout de même rafraîchissons nous la mémoire.
Les Collectors
Collector collector;①
Collectors.groupingBy(...);②
stream.collect(collector);③
① L'interface Collector
que nous devons implémenter.
② La classe Collectors
, qui fourni un certain nombre de collectors déjà implémentés
③ L'utilisation d'un collector via la méthode Stream#collect
Et maintenant leurs jumeaux
Les Gatherers
Gatherer gatherer;①
Gatherers.windowFixed(...);②
stream.gather(gatherer);③
① L'interface Gatherer
que nous devons implémenter.
② La classe Gatherers
, qui fourni un certain nombre de gatherers déjà implémentés. Ici windowFixed
qui accumule n éléments avant de les pousser dans le stream
sous forme de liste.
③ L'utilisation d'un gatherer via la méthode Stream#gather
Code utilisé pour les exemples
À partir de maintenant, tous les exemples suivront le code suivant
public void main() throws IOException {
Stream<Oeuvre> oeuvres = Reader.read().stream();
prettyPrint(
oeuvres.gather(
filter(oeuvre -> oeuvre.titre().contains("N")) ①
)
);
}
public record Oeuvre(
String titre,
Integer anneeParution,
boolean perdue) {
}
① C'est ici que nous placerons les gatherer custom
L'interface Gatherer
💡
Le code que nous allons regarder n'est pas le vrai code, mais une version épurée de l'interface.
Si vous voulez voir le vrai code, je rappelle que tout ceci est libre d'accès
package java.util.stream;
public interface Gatherer<T, A, R> {
default Supplier<A> initializer(); ①
Integrator<A, T, R> integrator(); ②
default BinaryOperator<A> combiner(); ③
default BiConsumer<A, Downstream<? super R>> finisher(); ④
}
On peut déjà constater que l'interface fait une utilisation massive des génériques.
Pour les illustrer, prenons l'exemple d'un gatherer qui réimplémente l'opération intermédaire map
(d'une instance d'Oeuvre
vers une String
en utilisant la méthode Oeuvre#titre()
).
Le type T
représente le type de l'objet entrant, ici Oeuvre
. Le type A
représente le type de l'état du gatherer (on y reviendra). Le type R
représente le type de retour du gatherer, ici String
.
① La méthode initializer
Elle permet d'initialiser l'état, si besoin. Elle possède une implémentation par défaut et renvoie un Supplier
d'état.
⚠️
Attention, elle ne renvoie pas un nouvel état, mais unSupplier
d'état.
② La méthode integrator
Son rôle est de retourner un Integrator
. C'est l'objet qui va intervenir sur le stream, et sur lequel nous allons revenir tout au long des exemples.
⚠️
Attention, encore une fois, elle n'implémente pas la méthode qui agit. C'est une factory.
③ La méthode combiner
Elle renvoie un BinaryOperator<X>
, c'est à dire, une BiFunction<X,X,X>
. Son rôle est de combiner les états en cas d'exécutions parallèles.
④ La méthode finisher
Elle renvoie un BinaryConsumer
, qui permet en cas de besoin d'exécuter une action en fin de traitement.
Let's code !
Oui, ok, t'es mignon, mais c'est quand même super abstrait ce que tu nous racontes là.
Et c'est vrai !
Je vous propose donc de redévelopper la méthode filter
dont vous connaissez déjà le fonctionnement.
On recode la méthode filter
C'est un gatherer
simple, donc nous n'aurons besoin que d'implémenter la méthode integrator
.
Nous allons l'implémenter à base d'anonymous inner class. À l'ancienne.
package org.github.jtama.gatherornot;
import java.util.function.Predicate;
import java.util.stream.Gatherer;
public class Filter implements Gatherer<Oeuvre, Object, Oeuvre> {
private final Predicate<Oeuvre> filter;
Filter(Predicate<Oeuvre> filter) {
this.filter = filter;
}
@Override
public Integrator<Object, Oeuvre, Oeuvre> integrator() {
return new Integrator<Object, Oeuvre, Oeuvre>() {
@Override
public boolean integrate( ④
Object state, ①
Oeuvre oeuvre, ②
Downstream<? super Oeuvre> downstream) { ③
if (filter.test(oeuvre)) {
return downstream.push(oeuvre);
}
return true;
}
};
}
}
① L'état que nous ignorons pour l'instant.
② L'instance d'Oeuvre
en cours de traitement dans le stream
.
③ Le downstream
représente ce qui vient après dans le stream
.
④ La méthode retourne un boolean
qui permet d'indiquer à l'API stream
si le gatherer accepte d'autres éléments. Comme, il s'agit d'un filtre, nous renvoyons toujours true
ou la propagation du résultat de la méthode downstream.push
.
Ici, pas vraiment de difficulté, mais un code vraiment verbeux que l'on va pouvoir simplifier.
L'implémentation de la classe Integrator
ne contient qu'une méthode. On peut donc écrire une lambda.
package org.github.jtama.gatherornot;
import java.util.function.Predicate;
import java.util.stream.Gatherer;
public class Filter implements Gatherer<Oeuvre, Object, Oeuvre> {
private final Predicate<Oeuvre> filter;
Filter(Predicate<Oeuvre> filter) {
this.filter = filter;
}
@Override
public Integrator<Object, Oeuvre, Oeuvre> integrator() {
return (_, oeuvre, downstream) -> { ①
if (filter.test(oeuvre)) {
return downstream.push(oeuvre);
}
return true;
};
}
}
① La variable state
n'étant pas utilisée, on peux utiliser un _
.
L'implémentation de la classe Filter
ne contient également qu'une méthode. Donc rebelote, transformation en lambda.
package org.github.jtama.gatherornot;
import java.util.function.Predicate;
import java.util.stream.Gatherer;
public class Filter {
public static Gatherer<Oeuvre, ?, Oeuvre> filter(Predicate<Oeuvre> filter) {
return () -> (_, oeuvre, downstream) -> {
if (filter.test(oeuvre)) {
return downstream.push(oeuvre);
}
return true;
};
}
}
Et voilà. C'est plus court. C'est mieux.
Non.
On a beaucoup perdu en lisibilité, mais les interfaces Gatherer
et Integrator
offrent des méthodes utilitaires pour la regagner.
package org.github.jtama.gatherornot;
import java.util.function.Predicate;
import java.util.stream.Gatherer;
import java.util.stream.Gatherer.Integrator;
public class Filter {
public static Gatherer<Oeuvre, ?, Oeuvre> filter(Predicate<Oeuvre> filter) {
return Gatherer.of(
Integrator.ofGreedy(①
(_, oeuvre, downstream) -> {
if (filter.test(oeuvre)) {
return downstream.push(oeuvre);
}
return true;
}));
}
}
① On utilise ici la méthode ofGreedy
(pour ceux qui ne parlent pas couramment anglais, greedy veut dire avide, allez tout de suite regarder le film Se7en), qui permet de dire à l'API stream
que ce Gatherer
n'interrompra jamais de lui même la consommation du stream, et qui permet à l'API stream
de faire des optimisations.
À partir de maintenant, nous utiliserons toujours cette façon d'écrire le code.
Un stream avec un index ?
N'avez-vous déjà pas eu envie d'accéder à l'index de l'élément en cours de traitement ? N'avez vous pas déjà essayé l'implémentation suivante ?
Stream<Oeuvre> oeuvres = Reader.read().stream();
AtomicInteger index = new AtomicInteger(0);①
oeuvres.map(value -> new Tuple<>(index.getAndIncrement(), value)));②
① On utilise un AtomicInteger
pour conserver/incrémenter l'index
② En admettant que la classe Tuple
existe
C'est une approche qui fonctionne très bien jusqu'à ce que quelqu'un ait la bonne idée d'ajouter un petit .parallel()
avant.
Bon je me permets d'ajouter qu'une bonne vieille boucle for
est certainement ce qu'il vous faut.
Mais, pas d'inquiétude, on va pouvoir arranger ça.
Commençons par noter, que pour la première fois depuis le début de l'article nous allons avoir besoin de quelque chose pour maintenir l'état.
Et comme l'API n'est pas trop mal faite, dans un Gatherer
, le nom de ce concept est state
. ¯\(ツ)/¯
package org.github.jtama.gatherornot;
import java.util.stream.Gatherer;
import java.util.stream.Gatherer.Integrator;
public class WithIndex {
public static Gatherer<Oeuvre, Counting, Tuple<Integer, Oeuvre>> withIndex() {
return Gatherer.ofSequential( ③
() -> new Counting(), ①
Integrator.ofGreedy(
(state, oeuvre, downstream) -> downstream.push(new Tuple<>(state.index++, oeuvre)))); ②
}
static class Counting {
int index;
}
}
① Notre Supplier
d'état, ici une instance de la classe counting qui contient l'index
② L'implémentation est exactement la même
③ On utilise la méthode ofSequential
, qui permet d'interdire l'exécution du gatherer
en parallel, même si le développeur le demande.
Un groupingBy
, mais pas terminal.
Pour rappel, la méthode groupingBy
est une opération finale, je vous encourage à aller lire la doc si vous voulez en savoir plus.
Ce que l'on cherche à implémenter c'est une opération intermédiaire qui va regrouper un ensemble cohérent d'élément dans une liste avant de les relacher dans le stream.
Par exemple avec un stream contenant les œuvres de Shakespeare classées par date de parution, j'aimerai pouvoir regrouper les oeuvres par année. Et tant qu'on y est, j'aimerais pouvoir l'utiliser pour d'autres objets, avec d'autres critères de regroupement.
Cela signifie que nous allons faire un Gatherer
générique.
Pour une fois, on va commencer en regardant l'utilisation !
Stream<Oeuvre> oeuvres = Reader.read().stream();
oeuvres.gather(series(Oeuvre::anneeParution)));①
① Je passe à ma factory de gatherer
un extracteur de clef.
Et maintenant l'implémentation :
package org.github.jtama.gatherornot;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Gatherer;
public class Serie {
public static <K,V> Gatherer<V, State, List<V>> series(Function<V,K> keyExtractor) {
return Gatherer.ofSequential(①
State::new,②
Gatherer.Integrator.ofGreedy((state, value, downstream) -> {
// First invocation or the same key value
if (state.key == null ||
keyExtractor.apply(value).equals(state.key)) { ③
state.values.add(value);
state.key = keyExtractor.apply(value);
return true;
}
var more = downstream.push(state.values); ④
state.values = new ArrayList<>();
state.key = keyExtractor.apply(value);
state.values.add(value);
return more;
}),
(state, downstream) -> downstream.push(state.values));
}
public static class State<K,V> {
private K key;
private List<V> values = new ArrayList<>();
}
}
① Oui on va rester en séquentiel, sinon il faudra s'asseoir sur le fait d'avoir de la donnée ordonnée.
② L'état va maintenir la clef de regroupement et la liste pour l'accumulation des valeurs regroupées.
③ Si c'est le premier tour de boucle ou que la valeur de regroupement est égale à celle de l'état, on accumule et réclame plus d'éléments.
④ Sinon, on pousse les valeurs déjà accumulées en conservant , et puis on reinitialise l'état et on propage le retour de l'invocation de la méthode downstream.push
.
Je me rend bien compte que ce gatherer
est un peu plus compliqué, mais l'avantage, c'est que si je veux regrouper mes éléments en fonction de la première lettre du titre, je peux.
Stream<Oeuvre> oeuvres = Reader.read().stream();
oeuvres.sorted(Comparator.comparing(Oeuvre::titre))
.gather(series(oeuvre -> oeuvre.titre().substring(0,1)));
Et ça fonctionnerait même avec une hypothétique classe Person
! Si je veux regrouper un stream
de personne par année de naissance :
Stream<Person> persons = Reader.read().stream();
persons.gather(series(Person::birthDate);
Et maintenant, on fusionne des streams
!
Il n'est pas possible, simplement, à ce jour de fusionner des stream
. Ce que je veux je veux obtenir est l'équivalent du join
de RxJava
Mais en plus strict.
Voilà comme ça.
Plus précisément je ne veux permettre que des paires complètes.
On va reprendre le principe de commencer par l'utilisation.
Stream<Oeuvre> oeuvres = Reader.readUnordered().stream();
prettyPrint(oeuvres.gather(merge(streamToBeMerged)));
Le code ci-dessus devrait produire :
┌──────────────────────────┬─────────────────────────┐
│Revue de presse │Titre │
├──────────────────────────┼─────────────────────────┤
│Beaucoup de bruits pour│Peines d amour gagnées │
│rien │ │
├──────────────────────────┼─────────────────────────┤
│Je ne m en souviens même│Cardenio │
│plus. │ │
├──────────────────────────┼─────────────────────────┤
│Jamais entendu parler │La Tempête │
├──────────────────────────┼─────────────────────────┤
│Numéro 1 sept semaines│Les Deux Gentilshommes de│
│d affilées │Vérone │
├──────────────────────────┼─────────────────────────┤
│Meilleure pièce de l année│Les Joyeuses Commères de│
│ │Windsor │
├──────────────────────────┼─────────────────────────┤
│Un chef d oeuvre │Mesure pour mesure │
└──────────────────────────┴─────────────────────────┘
Et maintenant l'implémentation :
package org.github.jtama.gatherornot;
import java.util.Iterator;
import java.util.stream.Gatherer;
import java.util.stream.Stream;
public class Merge {
public static <T,Y> Gatherer<Y, Iterator<T>, Tuple<T, Y>> merge(Stream<T> stream) {
return Gatherer.ofSequential(
stream::iterator, ①
Gatherer.Integrator.of(
(state, item, downstream) -> {
if (state.hasNext()) ②
return downstream.push(new Tuple<>(state.next(), item));
return false; ③
}));
}
}
① Pour savoir si il me reste quelque chose dans mon stream "à fusionner" je ne peux directement faire un Stream.hasNext
, ou Stream.next
(Cette notion n'est pas cohérente la philosophie Stream
). Je dois avoir un accès direct aux éléments de la collection. Je dois passer par un Iterator
.
② Si mon itérateur "à fusionner" en a encore dans le ventre, on pousse au dowstream.
③ Sinon on interrompt la consommation du stream
. Je rappelle qu'on a dit qu'on ne voulait que des paires complètes. Et puis c'est mon code, et je fais ce que je veux. Si vous voulez une autre implémentation, je ne vous empêche pas.
On y va ou pas ?
Alors que cet article touche à sa fin, j'espère vous avoir montré qu'il existe en effet des cas pour lesquels les Gatherer
vont nous permettre de répondre à de réels besoins. J'aimerais aussi attirer votre attention sur le fait que si ils ont mis autant de temps à arriver, c'est certainement parce qu'on peut déjà faire beaucoup avec l'existant, pourvu qu'on prenne le temps de regarder ce que l'on a déjà à disposition.
Vous trouverez dans le dépôt github joint, tout le code présenté et même plus.
To Gather, or not to Gather, that is the question.
Slides
Readable version available on Github Pages → ici
Generate
jbang qrcode@maxandersen -i slides/images/qr_inlay.png <open feedback url> --qr-colo="linear-gradient(90deg, rgba(36,14,0,1) 0%, rgba(9,121,105,1) 35%, rgba(0,212,255,1) 100%);"
jbang qrcode@maxandersen -i slides/image/github.png https://github.com/jtama/to-gather-or-not-to-gather
podman container run --rm -v $(pwd)/slides:/documents -w /documents asciidoctor/docker-asciidoctor:1.80.0 asciidoctor-revealjs -r asciidoctor-diagram index.adoc
Run locally
podman container run --name prez --rm -d -p 8080:80 -v $(pwd)/slides:/usr/share/nginx/html nginx
podman container run --name coder --rm -d -p 8443:8443 -v $(pwd)/app:/config/workspace ghcr.io/jtama/java_jbang_codeserver:latest
Et surtout n'oubliez pas :
Top comments (0)