Добавить побочный эффект к Java Stream после использования каждого элемента? - PullRequest
1 голос
/ 01 августа 2020

Это упрощенный пример, но он иллюстрирует суть.

Допустим, у меня есть метод, определенный следующим образом:

Stream<String> generateStream() {
  return Stream.of("hello", "world"); // (S)
}

Есть ли способ сделать, например, S напечатать что-нибудь после того, как элемент был потреблен, и потребитель потока не должен был ничего делать или знать об этом?

Например, я бы изменил S так, чтобы это:

generateStream().forEach(System.out::println)

выводит это на консоль:

hello
consumed
world
consumed

Возможно ли это в Java 8, и если да, то как?

Ответы [ 2 ]

3 голосов
/ 03 августа 2020

Вы можете использовать flatMap с закрывающим действием:

Stream<String> generateStream() {
  return Stream.of("hello", "world")
      .flatMap(s -> Stream.of(s).onClose(() -> System.out.println(s+" consumed")));
}

, которое работает по назначению:

generateStream().forEach(System.out::println);
hello
hello consumed
world
world consumed

даже для операций короткого замыкания:

Optional<String> o = generateStream().filter(s -> s.startsWith("he")).findFirst();
o.ifPresent(s -> System.out.println("found "+s));
hello consumed
found hello

Но обратите внимание, что это может иметь c драматическое * влияние на производительность по сравнению с операцией без flatMap. Но для целей отладки или в случаях, когда производительность не имеет значения, это может быть полезно.

Кроме того, имейте в виду, что потребление конвейером Stream не означает, что операция терминала не будет ссылка на него и не прекращать доступ к нему.

Это работает таким образом для операции forEach, но, например, для reduce((a,b) -> a), все элементы потребляются один за другим, но ссылка на первый элемент будут храниться до конца и даже возвращаться в качестве окончательного результата. Для min​(comparator) любой элемент может удерживаться до тех пор, пока не будет обнаружен меньший. И, наконец, такие операции, как toArray(), удерживают и возвращают все элементы в результате.

Кроме того, операции с отслеживанием состояния могут отделять последующую конвейерную обработку от исходного потока. Например, шаг sorted может действовать как toArray, буферизуя все элементы, заставляя их отображаться как потребленные для источника, перед сортировкой массива и продолжением потоковой передачи по массиву. Точно так же distinct будет содержать ссылки на объекты за пределами их потребления и в параллельном потоке, их последующая обработка может быть отложена до точки после закрытия исходного потока.

1 голос
/ 01 августа 2020

вы можете использовать peek в своей цепочке, чтобы использовать элемент и вернуть Stream. forEach закрывает поток


    Stream.of("hello", "world")
        .peek(System.out::println)
        .forEach(word -> System.out.println("consumed")); 

, или вы можете связать потребителей в foreach, чтобы добиться ожидаемого поведения. Stream не имеет явного метода в api для того, что вы спрашиваете, вы можете связать методы, используя peek вместо foreach, чтобы не завершать sh поток или цеплять потребителя в foreach

    Consumer<String> stringConsumer = System.out::println;
    Consumer<String> endConsumed = string -> System.out.println("consumed");
    Stream.of("hello", "world")
            .forEach(stringConsumer.andThen(endConsumed));
...