1. Обзор
В этом руководстве рассматривается практическое использование Java 8 Streams от создания до параллельного выполнения.
Чтобы понять этот материал, читателям необходимо иметь базовые знания о Java 8 (лямбда-выражения, Optional, ссылки на методы).
2. Создание потока
В Java существует множество способов создания экземпляра потока (Stream) из различных источников данных.
После создания экземпляра потока, он не изменяет исходный источник данных. Это означает, что любые операции, выполняемые над потоком (например, фильтрация, маппинг, сортировка), не влияют на сам исходный набор данных. Это обеспечивает безопасность данных и предотвращает нежелательные побочные эффекты.
Благодаря тому, что потоки не модифицируют исходные данные, из одного и того же источника можно создать несколько потоков. Это позволяет одновременно выполнять различные операции обработки данных над одним и тем же набором данных без изменения исходных данных.
2.1. Пустой поток
Мы должны использовать метод empty() для создания пустого потока:
Stream<String> streamEmpty = Stream.empty();
Мы часто используем метод empty() при создании, чтобы избежать возврата null для потоков без элементов:
public Stream<String> streamOf(List<String> list) {
return list == null || list.isEmpty() ? Stream.empty() : list.stream();
}
2.2. Поток коллекции
Мы также можем создать поток из любого типа Collection (Collection, List, Set):
Collection<String> collection = Arrays.asList("a", "b", "c");
Stream<String> streamOfCollection = collection.stream();
2.3. Поток массива
Массив также может быть источником потока:
Stream<String> streamOfArray = Stream.of("a", "b", "c");
Мы также можем создать stream из существующего массива или части массива:
String[] arr = new String[]{"a", "b", "c"};
Stream<String> streamOfArrayFull = Arrays.stream(arr);
Stream<String> streamOfArrayPart = Arrays.stream(arr, 1, 3); // "b" "c"
2.4. Stream.builder()
Желаемый тип должен быть дополнительно указан в правой части инструкции,Когда используется builder, в противном случае метод build() создаст экземпляр Stream<Object>:
Stream<String> streamBuilder =
Stream.<String>builder().add("a").add("b").add("c").build();
2.5. Stream.generate()
Метод generate() принимает Supplier<T> для генерации элемента. Поскольку результирующий поток бесконечен, разработчик должен указать желаемый размер, иначе метод generate() будет работать до тех пор, пока не достигнет предела памяти:
Stream<String> streamGenerated =
Stream.generate(() -> "element").limit(10);
Приведенный выше код создает последовательность из десяти строк со значением "element".
2.6. Stream.iterate()
Другой способ создания бесконечного потока - это использование метода iterate():
Stream<Integer> streamIterated = Stream.iterate(40, n -> n + 2).limit(20);
Первый элемент результирующего stream является первым параметром метода iterate(). При создании каждого следующего элемента указанная функция применяется к предыдущему элементу. В приведенном выше примере вторым элементом будет 42.
2.7. Поток примитивов
Java 8 предлагает возможность создавать потоки из трех примитивных типов: int, long и double. Поскольку Stream<T> является универсальным интерфейсом, и нет способа использовать примитивы в качестве параметра типа с generics, были созданы три новых специальных интерфейса: IntStream, LongStream, DoubleStream.
Использование новых интерфейсов устраняет ненужную автоматическую упауовку, что позволяет повысить производительность:
IntStream intStream = IntStream.range(1, 3);
LongStream longStream = LongStream.rangeClosed(1, 3);
Метод range(int startInclusive, int endExclusive) создает упорядоченный поток от первого параметра ко второму параметру. Оно увеличивает значение последующих элементов с шагом, равным 1. Результат не включает последний параметр, это просто верхняя граница последовательности.
Метод rangeClosed(int startInclusive, int endInclusive) выполняет то же самое, только с одним отличием, включен второй элемент. Мы можем использовать эти два метода для генерации любого из трех типов потоков примитивов.
Начиная с Java 8, класс Random предоставляет широкий спектр методов для генерации потоков примитивов. Например, следующий код создает DoubleStream, который состоит из трех элементов:
Random random = new Random();
DoubleStream doubleStream = random.doubles(3);
2.8. Поток строк
Мы также можем использовать String в качестве источника для создания потока с помощью метода chars() класса String. Поскольку в JDK нет интерфейса для CharStream , мы используем IntStream вместо этого для представления потока символов.
IntStream streamOfChars = "abc".chars();
Следующий пример разбивает строку на подстроки в соответствии с указанным регулярным выражением:
Stream<String> streamOfString =
Pattern.compile(", ").splitAsStream("a, b, c");
2.9. Поток файлов
Кроме того, Java NIO класс Files позволяет нам генерировать Stream<String> текстового файла с помощью метода lines(). Каждая строка текста становится элементом потока:
Path path = Paths.get("C:\\file.txt");
Stream<String> streamOfStrings = Files.lines(path);
Stream<String> streamWithCharset =
Files.lines(path, Charset.forName("UTF-8"));
Кодировка может быть указана в качестве аргумента метода lines().
3. Ссылка на поток
Мы можем создать экземпляр потока и иметь доступную ссылку на него, если вызываются только промежуточные операции. Выполнение терминальной операции делает поток недоступным.
Чтобы продемонстрировать это, мы ненадолго забудем, что наилучшей практикой является объединение последовательности операций в цепочку. Помимо ненужной многословности, технически следующий код допустим:
Stream<String> stream =
Stream.of("a", "b", "c").filter(element -> element.contains("b"));
Optional<String> anyElement = stream.findAny();
Однако попытка повторно использовать ту же ссылку после вызова операции терминала вызовет исключение IllegalStateException:
Optional<String> firstElement = stream.findFirst();
Поскольку исключение IllegalStateException является исключением RuntimeException, компилятор не будет сигнализировать о проблеме. Поэтому очень важно помнить, что потоки Java 8 нельзя использовать повторно.
Такое поведение логично. Мы разработали streams для применения конечной последовательности операций к источнику элементов в функциональном стиле, а не для хранения элементов.
Итак, чтобы предыдущий код работал должным образом, необходимо внести некоторые изменения:
List<String> elements =
Stream.of("a", "b", "c").filter(element -> element.contains("b"))
.collect(Collectors.toList());
Optional<String> anyElement = elements.stream().findAny();
Optional<String> firstElement = elements.stream().findFirst();
4. Потоковый конвейер
Для выполнения последовательности операций над элементами источника данных и агрегирования их результатов нам понадобятся три части: исходный код, промежуточные операции и терминальная операция.
Промежуточные операции возвращают новый измененный поток. Например, чтобы создать новый поток вместо существующего без нескольких элементов, следует использовать метод skip():
Stream<String> onceModifiedStream =
Stream.of("abcd", "bbcd", "cbcd").skip(1);
Если нам нужно больше одной модификации, мы можем связать промежуточные операции. Давайте предположим, что нам также нужно заменить каждый элемент текущего Stream<String> подстрокой из первых нескольких символов. Мы можем сделать это, объединив методы skip() и map():
Stream<String> twiceModifiedStream =
stream.skip(1).map(element -> element.substring(0, 3));
Поток сам по себе ничего не стоит; пользователя интересует результат операции терминала, который может быть значением некоторого типа или действием, применяемым к каждому элементу потока. Мы можем использовать только одну терминальную операцию для каждого потока.
Правильный и наиболее удобный способ использования потоков - это конвейер потока, который представляет собой цепочку из источника потока, промежуточных операций и терминальной операции:
List<String> list = Arrays.asList("abc1", "abc2", "abc3");
long size = list.stream().skip(1)
.map(element -> element.substring(0, 3)).sorted().count();
5. Отложенный вызов
Промежуточные операции являются ленивыми. Это означает, что они будут вызываться только в том случае, если это необходимо для выполнения терминальной операции.
Например, давайте вызовем метод wasCalled(), который увеличивает внутренний счетчик при каждом вызове:
private long counter;
private void wasCalled() {
counter++;
}
Теперь давайте вызовем метод, который был вызван() из операции filter():
List<String> list = Arrays.asList(“abc1”, “abc2”, “abc3”);
counter = 0;
Stream<String> stream = list.stream().filter(element -> {
wasCalled();
return element.contains("2");
});
Поскольку у нас есть источник из трех элементов, мы можем предположить, что метод filter() будет вызван три раза, а значение переменной counter будет равно 3. Однако выполнение этого кода вообще не изменяет счетчик , он по-прежнему равен нулю, поэтому метод filter() даже не был вызван ни разу. Причина, это отсутствие терминальной операции.
Давайте немного перепишем этот код, добавив операцию map() и терминальную операцию, findFirst(). Мы также добавим возможность отслеживать порядок вызовов методов с помощью ведения журнала:
Optional<String> stream = list.stream().filter(element -> {
log.info("filter() was called");
return element.contains("2");
}).map(element -> {
log.info("map() was called");
return element.toUpperCase();
}).findFirst();
Результирующий журнал показывает, что мы дважды вызывали метод filter() и один раз метод map() . Это потому, что конвейер выполняется вертикально. В нашем примере первый элемент stream не удовлетворял предикату filter. Затем мы вызвали метод filter() для второго элемента, который прошел фильтр. Не вызывая filter() для третьего элемента, мы перешли по конвейеру к методу map().
Операция findFirst() удовлетворяет только одному элементу. Итак, в этом конкретном примере отложенный вызов позволил нам избежать двух вызовов метода, одного для filter() (для третьего элемента) и одного для map() (для первого элемента).
6. Порядок выполнения
С точки зрения производительности, правильный порядок является одним из наиболее важных аспектов операций объединения в цепочку в конвейере stream:
long size = list.stream().map(element -> {
wasCalled();
return element.substring(0, 3);
}).skip(2).count();
Выполнение этого кода увеличит значение счетчика на три. Это означает, что мы вызвали метод stream три раза, но значение size равно единице. Итак, результирующий поток содержит только один элемент, и мы выполнили дорогостоящие операции map() без причины два раза из трех.
Если мы изменим порядок методов skip() и map(), то счетчик увеличится всего на единицу. Итак, мы вызовем метод map() только один раз:
long size = list.stream().skip(2).map(element -> {
wasCalled();
return element.substring(0, 3);
}).count();
Это подводит нас к следующему правилу: промежуточные операции, которые уменьшают размер потока, должны быть размещены перед операциями, которые применяются к каждому элементу. Итак, нам нужно сохранить такие методы, как skip(), filter(), и distinct() в верхней части нашего конвейера stream.
7. Сокращение потока
API имеет множество терминальных операций, которые сводят поток к типу или примитиву: count(), max(), min(), и sum(). Однако эти операции работают в соответствии с предопределенной реализацией. Ну и что, если разработчику необходимо настроить механизм сокращения потока? Есть два метода, которые позволяют нам сделать это, методы reduce() и collect().
7.1. Метод reduce() (объединение)
Существует три варианта этого метода, которые отличаются своими сигнатурами и типами возвращаемых данных. Они могут иметь следующие параметры:
identity – начальное значение для накопителя или значение по умолчанию, если поток пуст и накапливать нечего
accumulator – функция, которая определяет логику агрегирования элементов. Поскольку accumulator создает новое значение для каждого шага объединения, количество новых значений равно размеру потока, и полезно только последнее значение. Это не очень хорошо сказывается на производительности.
combiner – функция, которая агрегирует результаты сумматора. Мы вызываем combiner только в параллельном режиме, чтобы объединить результаты accumulators из разных потоков.
Теперь давайте посмотрим на эти три метода в действии:
OptionalInt reduced =
IntStream.range(1, 4).reduce((a, b) -> a + b); // 6 (1 + 2 + 3)
int reducedTwoParams =
IntStream.range(1, 4).reduce(10, (a, b) -> a + b); // 16 (10 + 1 + 2 + 3)
int reducedParams = Stream.of(1, 2, 3)
.reduce(10, (a, b) -> a + b, (a, b) -> {
log.info("combiner was called");
return a + b;
});
Результат будет таким же, как в предыдущем примере (16), и логирования не будет, что означает, что combiner не вызывался. Чтобы combiner работал, поток должен быть параллельным:
int reducedParallel = Arrays.asList(1, 2, 3).parallelStream()
.reduce(10, (a, b) -> a + b, (a, b) -> {
log.info("combiner was called");
return a + b;
});
Результат здесь другой (36), и combiner вызывался дважды. Здесь объединение выполняется по следующему алгоритму: накопитель запускается три раза путем добавления каждого элемента потока к identity. Эти действия выполняются параллельно. В результате у них получилось (10 + 1 = 11; 10 + 2 = 12; 10 + 3 = 13;). Теперь combiner может объединить эти три результата. Для этого требуется две итерации (12 + 13 = 25; 25 + 11 = 36).
7.2. Метод collect()
Объединение потока также может быть выполнено с помощью другой терминальной операции, метода collect(). Он принимает аргумент типа Collector, который определяет механизм объединения. Для большинства распространенных операций уже созданы предопределенные коллекторы. К ним можно получить доступ с помощью типа Collectors.
В этом разделе мы будем использовать следующий список в качестве источника для всех потоков:
List<Product> productList = Arrays.asList(new Product(23, "potatoes"),
new Product(14, "orange"), new Product(13, "lemon"),
new Product(23, "bread"), new Product(13, "sugar"));
Преобразование потока в коллекцию (Collection, List или Set):
List<String> collectorCollection =
productList.stream().map(Product::getName).collect(Collectors.toList());
Объединение в строку:
String listToString = productList.stream().map(Product::getName)
.collect(Collectors.joining(", ", "[", "]"));
Метод joiner() может иметь от одного до трех параметров (разделитель, префикс, суффикс). Самое удобное в использовании joiner() заключается в том, что разработчику не нужно проверять, достигает ли поток своего конца, чтобы применить суффикс, а не разделитель. Коллектор позаботится об этом.
Обработка среднего значения всех числовых элементов потока:
double averagePrice = productList.stream()
.collect(Collectors.averagingInt(Product::getPrice));
Обработка суммы всех числовых элементов потока:
int summingPrice = productList.stream()
.collect(Collectors.summingInt(Product::getPrice));
Методы averagingXX(), summingXX() и summarizingXX() могут работать с примитивами (int, long, double) и с их классами-оболочками (Integer, Long, Double). Еще одной мощной функцией этих методов является обеспечение маппинга. В результате разработчику не нужно использовать дополнительную операцию map() перед методом collect().
Сбор статистической информации об элементах stream:
IntSummaryStatistics statistics = productList.stream()
.collect(Collectors.summarizingInt(Product::getPrice));
Используя результирующий экземпляр типа IntSummaryStatistics, разработчик может создать статистический отчет, применив метод toString(). Результатом будет строка, общая для этой “IntSummaryStatistics{count=5, sum=86, min=13, average=17200000, max=23}.”
Также легко извлечь из этого объекта отдельные значения для count, sum, min, и average, применив методы getCount(), getSum(), getMin(), getAverage(), и getMax(). Все эти значения могут быть извлечены из одного конвейера.
Группировка элементов stream в соответствии с указанной функцией:
Map<Integer, List<Product>> collectorMapOfLists = productList.stream()
.collect(Collectors.groupingBy(Product::getPrice));
В приведенном выше примере поток был сведен к Map, который группирует все продукты по их цене.
Разделение элементов stream на группы в соответствии с некоторым предикатом:
Map<Boolean, List<Product>> mapPartioned = productList.stream()
.collect(Collectors.partitioningBy(element -> element.getPrice() > 15));
Этот код использует потоки данных (Streams) для разделения списка продуктов (productList) на две группы на основе цены продукта. Результатом будет Map, где ключами являются Boolean значения (true или false), а значениями — списки продуктов, соответствующие этим ключам.
Приведение коллектора к дополнительному преобразованию:
Set<Product> unmodifiableSet = productList.stream()
.collect(Collectors.collectingAndThen(Collectors.toSet(),
Collections::unmodifiableSet));
В данном конкретном случае сборщик преобразовал stream в Set, а затем создал из него неизменяемый Set.
Кастомный сборщик:
Если по какой-либо причине необходимо создать кастомный коллектор, самый простой и наименее подробный способ сделать это - использовать метод of() типа Collector.
Collector<Product, ?, LinkedList<Product>> toLinkedList =
Collector.of(LinkedList::new, LinkedList::add,
(first, second) -> {
first.addAll(second);
return first;
});
LinkedList<Product> linkedListOfPersons =
productList.stream().collect(toLinkedList);
Метод of используется для создания экземпляра Collector. Первым аргументом является функция, которая создает аккумулятор (в нашем случае, новый LinkedList). Вторым аргументом — функция, которая добавляет элемент в аккумулятор. Третий аргумент — функция, объединяющая два аккумулятора в один. В данном случае, она просто добавляет все элементы из второго аккумулятора в первый. В этом примере экземпляр коллектора был объединен в LinkedList<Product>.
8. Параллельные потоки
До появления Java 8 распараллеливание было сложным. Появление ExecutorService и forkJoin немного упростило жизнь разработчику, но все же стоило запомнить, как создать конкретный исполнитель, как его запускать и так далее. В Java 8 представлен способ реализации параллелизма в функциональном стиле.
API позволяет нам создавать параллельные потоки, которые выполняют операции в параллельном режиме. Если источником потока является коллекция или массив, этого можно достичь с помощью метода parallelStream():
Stream<Product> streamOfCollection = productList.parallelStream();
boolean isParallel = streamOfCollection.isParallel();
boolean bigPrice = streamOfCollection
.map(product -> product.getPrice() * 12)
.anyMatch(price -> price > 200);
Если источником потока является нечто иное, чем коллекция или массив, следует использовать метод parallel():
IntStream intStreamParallel = IntStream.range(1, 150).parallel();
boolean isParallel = intStreamParallel.isParallel();
По сути, Stream API автоматически использует платформу forkJoin для параллельного выполнения операций. По умолчанию будет использоваться общий пул потоков.
При использовании потоков в параллельном режиме избегайте блокирования операций. Также лучше использовать параллельный режим, когда для выполнения задач требуется аналогичное количество времени. Если одна задача длится намного дольше другой, это может замедлить рабочий процесс всего приложения.
Поток в параллельном режиме может быть преобразован обратно в последовательный режим с помощью метода sequential():
IntStream intStreamSequential = intStreamParallel.sequential();
boolean isParallel = intStreamSequential.isParallel();
9. Заключение
Stream API - это мощный, но простой для понимания набор инструментов для обработки последовательности элементов. При правильном использовании это позволяет нам сократить огромное количество шаблонного кода, создавать более читаемые программы и повышать производительность приложения.
В большинстве примеров кода, показанных в этой статье, мы оставили потоки неиспользованными (мы не применяли метод close() или терминальную операцию). В реальном приложении не оставляйте созданный поток неиспользованным, так как это приведет к утечке памяти.
Top comments (0)