Как сгладить список в потоке завершаемых фьючерсов? - PullRequest
3 голосов
/ 13 октября 2019

У меня есть это:

Stream<CompletableFuture<List<Item>>>

как я могу преобразовать его в

Stream<CompletableFuture<Item>>

Где: второй поток состоит из всех элементов внутри каждого из списков впервый поток.

Я посмотрел на thenCompose, но это решает совершенно другую проблему, которая также называется "выравнивание".

Как это можно сделать эффективно, потоковым способомбез блокирования или преждевременного использования большего количества элементов потока, чем необходимо?

Вот моя лучшая попытка:

    ExecutorService pool = Executors.newFixedThreadPool(PARALLELISM);
    Stream<CompletableFuture<List<IncomingItem>>> reload = ... ;

    @SuppressWarnings("unchecked")
    CompletableFuture<List<IncomingItem>> allFutures[] = reload.toArray(CompletableFuture[]::new);
    CompletionService<List<IncomingItem>> queue = new ExecutorCompletionService<>(pool);
    for(CompletableFuture<List<IncomingItem>> item: allFutures) {
        queue.submit(item::get);
    }
    List<IncomingItem> THE_END = new ArrayList<IncomingItem>();
    CompletableFuture<List<IncomingItem>> ender = CompletableFuture.allOf(allFutures).thenApply(whatever -> {
        queue.submit(() -> THE_END);
        return THE_END;
    });
    queue.submit(() -> ender.get());
    Iterable<List<IncomingItem>> iter = () -> new Iterator<List<IncomingItem>>() {
        boolean checkNext = true;
        List<IncomingItem> next = null;
        @Override
        public boolean hasNext() {
            if(checkNext) {
                try {
                    next = queue.take().get();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
                checkNext = false;
            }
            if(next == THE_END || next == null) {
                return false;
            }
            else {
                return true;
            }
        }
        @Override
        public List<IncomingItem> next() {
            if(checkNext) {
                hasNext();
            }
            if(!hasNext()) {
                throw new IllegalStateException();
            }
            checkNext = true;
            return next;
        }
    };
    Stream<IncomingItem> flat = StreamSupport.stream(iter.spliterator(), false).flatMap(List::stream);

Сначала это работает, к сожалению, это имеет фатальную ошибку:поток, кажется, преждевременно завершается, прежде чем получить все элементы.

1 Ответ

0 голосов
/ 19 октября 2019

Как я написал в своем комментарии, это невозможно.

Рассмотрим какой-нибудь произвольный сервис, который будет возвращать CompletableFuture<Integer>:

CompletableFuture<Integer> getDiceRoll();

Теперь я могу преобразовать это CompletableFuture<Integer> к Stream<CompletableFuture<List<Object>>> без проблем:

Stream<CompletableFuture<List<Object>>> futureList = Stream.of(getDiceRoll().thenApply(n -> List.of(new Object[n])));

Предположим, что существует общий способ превратить Stream<CompletableFuture<List<T>>> в Stream<CompletableFuture<T>>:

<T> Stream<CompletableFuture<T> magic(Stream<CompletableFuture<List<T>>> arg);

Тогда ямогу сделать следующее:

int diceRoll = magic(Stream.of(getDiceRoll().thenApply(n -> List.of(new Object[n])))).count();

Подождите, что?
Теперь я могу получить произвольное целое число из CompletableFuture. Это означает, что с некоторыми инженерными усилиями я могу получить всю информацию из CompletableFuture - в конце концов, память - это просто некоторые числа.

Таким образом, мы должны сделать вывод, что такой метод, как magic, не может существовать безнарушая ткань времени.
И вот ответ: такого метода не существует, потому что он не может существовать.

...