Поток CompletableFuture, созданный из итератора, не оценивается лениво - PullRequest
0 голосов
/ 16 мая 2018

Я немного борюсь с тем, как и когда завершаются фьючерсы. Я создал этот тестовый пример:

import org.junit.Test;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class StreamOfCompletableFuturesTest {
    @Test
    public void testList() {
        completeFirstTwoElements(
                Stream.of("list one", "list two", "list three", "list four", "list five")
        );
    }

    @Test
    public void testIterator() {
        Iterator<String> iterator = Arrays.asList("iterator one", "iterator two", "iterator three", "iterator four", "iterator five").iterator();

        completeFirstTwoElements(
            StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
        );
    }

    private void completeFirstTwoElements(Stream<String> stream) {
        stream
                .map(this::cf)
                .limit(2)
                .parallel()
                .forEach(cf -> {
                    try {
                        System.out.println(cf.get());
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                });
    }

    private CompletableFuture<String> cf(String result) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("Running " + result);
            return result;
        });
    }
}

И вывод:

Running list one
Running list two
list two
list one
Running iterator one
Running iterator two
Running iterator three
Running iterator four
Running iterator five
iterator two
iterator one

Метод testList работает как положено. CompletableFuture оцениваются только в самом конце, поэтому после метода limit остались только первые два элемента.

Однако метод testIterator является неожиданным. Все CompletableFuture завершены, а ограничение выполняется только после этого.

Если я удаляю метод parallel() из потока, он работает как положено. Однако обработка (forEach()) должна выполняться параллельно, потому что в моей полной программе это длительный метод.

Кто-нибудь может объяснить, почему это происходит?

Похоже, это зависит от версии Java, поэтому я на 1.8:

$ java -version
java version "1.8.0_92"
Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)

Ответы [ 2 ]

0 голосов
/ 16 мая 2018

Ваше утверждение «все CompletableFuture s выполнены» эквивалентно «все CompletableFuture s созданы», так как после выполнения supplyAsync оценка поставщика была назначена независимо от того, будет ли кто-тов конце концов, вызовите get или нет.

Итак, что вы здесь воспринимаете, это оценка функции, переданной в map, даже если последующая обработка не будет потреблять результат.Это правильное поведение;функция может быть оценена для большего количества элементов, чем необходимо, в произвольном порядке или даже одновременно, при условии, что Stream будет впоследствии использовать правильные результаты в отношении предела и порядка встречи.

Теперь, оцениваем либудет происходить больше элементов, чем необходимо, и сколько избыточных элементов будет обработано, это детали реализации, и реализация изменилась, как обсуждалось в « Внутренние изменения для лимита и неупорядоченного потока ».Хотя эти вопросы и ответы касаются неупорядоченных потоков, вполне вероятно, что аналогичные улучшения были сделаны для упорядоченных потоков.

Вывод заключается в том, что не следует предполагать, что функции оцениваются только для минимального количества требуемых элементов.Это снизит эффективность параллельной обработки.Это все еще применяется, даже когда Java 9 улучшил параллельную операцию limit.Простое изменение может повторно ввести оценку большего количества элементов:

private void completeFirstTwoElements(Stream<String> stream) {
    stream.map(this::cf)
          .filter(x -> true)
          .limit(2)
          .parallel()
          .forEach(cf -> System.out.println(cf.join()));
}
0 голосов
/ 16 мая 2018

Параллелизм применяется ко всему конвейеру, поэтому вы не можете реально контролировать то, что будет выполнено до применения limit() в параллельной Stream.Единственная гарантия состоит в том, что то, что после limit() будет выполнено только для сохраненных элементов.

Различие между ними, вероятно, связано с некоторыми деталями реализации или другими Stream характеристиками.Фактически, вы можете легко изменить поведение, играя на характеристике SIZED.Кажется, когда Stream имеет известный размер, обрабатываются только 2 элемента.

Так, например, применение простого filter() приведет к потере размера версии списка:

completeFirstTwoElements(
        Stream.of("list one", "list two", "list three", "list four", "list five").filter(a -> true)
);
*Например, 1014 * выводит:
Running list one
Running list five
Running list two
Running list three
list one
list two

И без использования неизвестного размера версия Spliterator.spliterator() "исправляет" поведение:

Iterator<String> iterator = Arrays.asList("iterator one", "iterator two", "iterator three", "iterator four", "iterator five").iterator();

completeFirstTwoElements(
        StreamSupport.stream(Spliterators.spliterator(iterator, Spliterator.ORDERED, 5), false)
);

Вывод:

Running iterator two
Running iterator one
iterator one
iterator two
...