Отфильтруйте дубликаты для CompletableFuture - PullRequest
0 голосов
/ 22 мая 2018

Я хочу отфильтровать дубликаты после первого CompletableFuture, а затем вызвать второй этап, используя другой CompletableFuture.То, что я пробовал:

@FunctionalInterface
public interface FunctionWithExceptions<T, R, E extends Exception> {
    R process(T t) throws E;
}


public static <T> Predicate<T> distinctByKey(FunctionWithExceptions<? super T, ?, ?> keyExtractor) {
    Set<Object> seen = ConcurrentHashMap.newKeySet();
    return t -> {
        String key = "";
        try {
            key = (String) keyExtractor.process(t);
        } catch (Exception e) {
            log.info("Get instanceIp failed!");
        }
        return seen.add(key);
    };
}

List<CompletableFuture<InstanceDo>> instanceFutures = podNames.stream()
            .map(podName -> CompletableFuture.supplyAsync(RethrowExceptionUtil.rethrowSupplier(() -> {
                PodDo podDo = getPodRetriever().getPod(envId, podName);
                podDoList.add(podDo);
                return podDo;
            }), executor))
            .map(future -> future.thenApply(podDo -> podDo.getInstanceName()))
            .filter(distinctByKey(CompletableFuture::get))
            .map(future -> future.thenCompose(instanceName ->
                    CompletableFuture.supplyAsync(() -> get(envId, instanceName), executor)))
            .collect(Collectors.toList());

Как видите, distinctByKey будет вызывать get, что напрямую сделает последовательность параллелизм до .

Что я должен сделать, чтобы снова сделать его CONCURRENT , но при этом сохранить отличную функцию?

ИЛИ

У меня есть только один выбор?

Дождаться завершения всего первого этапа и затем запустить второй этап ?

Ответы [ 2 ]

0 голосов
/ 23 мая 2018

Небольшое улучшение по сравнению с отправленным вами ответом может заключаться в использовании ConcurrentHashMap в качестве своего рода кэша, так что ваш окончательный список содержит те же результаты, независимо от порядка, в котором вы их получили:

Map<Integer, CompletableFuture<Integer>> seen = new ConcurrentHashMap<>();
List<CompletableFuture<Integer>> intFutures = Stream.iterate(0, i -> i + 1)
        .limit(5)
        .map(i -> CompletableFuture.supplyAsync(() -> runStage1(i)))
        .map(cf -> cf.thenCompose(result ->
                seen.computeIfAbsent(
                        result, res -> CompletableFuture.supplyAsync(() -> runStage2(res))
                )
        ))
        .collect(Collectors.toList());

Обратите внимание, что важно, чтобы функция, переданная в computeIfAbsent(), немедленно возвращалась (например, с использованием supplyAsync()), потому что она удерживает блокировку внутри карты во время выполнения.Кроме того, эта функция не должна пытаться изменить карту seen, поскольку может вызвать проблемы .

При таком изменении выходные данные могут быть, например:

stage - 1: 1
stage - 1: 0
stage - 1: 2
stage - 2: 1
stage - 2: 2
stage - 1: 3
stage - 2: 0
stage - 1: 4
0
1
2
0
1

Кроме того, это позволяет проверить карту seen после завершения всех фьючерсов, чтобы получить уникальные результаты.

0 голосов
/ 22 мая 2018

Я только что написал простую демонстрацию, чтобы решить эту проблему, но я действительно не знаю, надежно это или нет.Но, по крайней мере, он гарантирует, что второй этап может быть ускорен с помощью Set<Object> seen = ConcurrentHashMap.newKeySet();.

public static void main(String... args) throws ExecutionException, InterruptedException {
        Set<Object> seen = ConcurrentHashMap.newKeySet();
        List<CompletableFuture<Integer>> intFutures = Stream.iterate(0, i -> i+1)
                .limit(5)
                .map(i -> CompletableFuture.supplyAsync(() -> {
                    int a = runStage1(i);
                    if (seen.add(a)) {
                        return a;
                    } else {
                        return -1;
                    }}))
                .map(future -> future.thenCompose(i -> CompletableFuture.supplyAsync(() -> {
                    if (i > 0) {
                        return runStage2(i);
                    } else {
                        return i;
                    }})))
                .collect(Collectors.toList());
        List<Integer> resultList = new ArrayList<>();
        try {
            for (CompletableFuture<Integer> future: intFutures) {
                resultList.add(future.join());
            }
        } catch (Exception ignored) {
            ignored.printStackTrace();
            out.println("Future failed!");
        }
        resultList.stream().forEach(out::println);
    }

    private static Integer runStage1(int a) {
        out.println("stage - 1: " + a);
        try {
            Thread.sleep(500 + Math.abs(new Random().nextInt()) % 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return Integer.valueOf(a % 3);
    }

    private static Integer runStage2(int b) {
        out.println("stage - 2: " + b);
        try {
            Thread.sleep(200 + Math.abs(new Random().nextInt()) % 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return Integer.valueOf(b);
    }

Возвращая специальные значения на этапе первый , когда он дублируется , а затем на этапе второй через специальное значение (-1) я могу игнорировать трудоемкие вычисления второго этапа.

Выходные данные действительно отфильтровывают некоторые избыточные вычисления второй ступени .

stage - 1: 0
stage - 1: 1
stage - 1: 2
stage - 1: 3
stage - 2: 2 // 
stage - 2: 1 //
stage - 1: 4
0
1
2
-1
-1

Я думаю, что это не очень хорошее решение.Но что я могу оптимизировать, чтобы сделать его лучше?

...