Как ждать список ListenableFuture с тайм-аутом - PullRequest
0 голосов
/ 21 декабря 2018

Я работаю над проблемой, где у меня есть List<ListenableFuture<T>>.Я хотел бы объединить результаты всех этих фьючерсов в List<T> с таймаутом.Наивный подход будет выглядеть примерно так:

List<T> blockForResponses(List<ListenableFuture<T>> futures, long timeoutMillis) {
    return futures.stream()
        .map(future -> future.get(timeoutMillis,TimeUnit.MILLISECONDS)
        .collect(Collectors.toList());
}

Это не работает, потому что он ожидает тайм-аут для каждого будущего, и я хочу, чтобы это был тайм-аут для всего списка.Отслеживание того, сколько времени прошло, также не работает, потому что, если первый раз выйдет, у него не останется времени, чтобы попробовать другие.

Решение, которое я ищу, приведет в исполнениетайм-аут на все фьючерсы и возврат по истечении этого времени или по завершении всех фьючерсов в списке.Затем я мог бы самостоятельно проверить каждое будущее в списке, чтобы собрать результаты и проверить, какие из них истекли.

Ответы [ 2 ]

0 голосов
/ 21 декабря 2018

Эта проблема оказалась проще, чем я думал.Я смог использовать метод Futures.allAsList и затем поймать TimeoutException:

List<T> blockForResponses(List<ListenableFuture<T>> futures, long timeoutMillis) {
    ListenableFuture<List<T>> futureOfList = Futures.allAsList(futures);
    List<T> responses;
    try {
        responses = futureOfList.get(timeoutMillis, TimeUnit.MILLISECONDS);
    } catch (TimeoutException e) {
        responses = new ArrayList<>();
        for (ListenableFuture<T> future : futures) {
            if (future.isDone()) {
                responses.add(Uninterruptibles.getUninterruptibly(future));
            }
        }
    }
    return responses;
}
0 голосов
/ 21 декабря 2018

Так что я немного повозился (до сегодняшнего вечера не использовал интерфейс Listenable от Guava), но я думаю, что это может сработать для вас: package basic;

import static com.google.common.util.concurrent.Futures.catching;
import static com.google.common.util.concurrent.Futures.successfulAsList;
import static com.google.common.util.concurrent.Futures.withTimeout;

import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

public class FuturesExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ListeningScheduledExecutorService scheduledExecutorService = MoreExecutors
                .listeningDecorator(Executors.newScheduledThreadPool(20));
        List<ListenableFuture<Integer>> list = new LinkedList<>();
        for (int i = 1; i <= 4; i++) {
            list.add(catching(
                    withTimeout(scheduledExecutorService.submit(getCallback(i * 1000, i)), 3, TimeUnit.SECONDS,
                            scheduledExecutorService),
                    TimeoutException.class, exception -> 0, scheduledExecutorService));
        }
        ListenableFuture<List<Integer>> result = successfulAsList(list);
        Optional<Integer> sum = result.get().stream().reduce(Integer::sum);
        System.out.println(sum.orElse(-1));
        scheduledExecutorService.shutdownNow();
    }

    private static Callable<Integer> getCallback(int timeout, int value) {
        return () -> {
            Thread.sleep(timeout);
            return value;
        };
    }
}

edit: код немного чище, когдаиспользование статического импорта для фьючерсов

...