Использование CompletableFuture в функции фильтра - PullRequest
0 голосов
/ 07 декабря 2018

У меня есть сценарий использования, в котором я хочу отфильтровать несколько элементов в списке на основе сетевого вызова, который я выполняю для элемента.Для этого я использую потоки, фильтр и Completable Future.Цель состоит в том, чтобы выполнить асинхронное выполнение, чтобы операция стала эффективной.Псевдокод для этого упоминается ниже.

public List<Integer> afterFilteringList(List<Integer> initialList){
   List<Integer> afterFilteringList =initialList.stream().filter(element -> {
        boolean valid = true;
        try{
            valid = makeNetworkCallAndCheck().get();
        } catch (Exception e) {

        }
        return valid;
    }).collect(Collectors.toList());

    return afterFilteringList;
}
public CompletableFuture<Boolean> makeNetworkCallAndCheck(Integer value){
   return CompletableFuture.completedFuture(resultOfNetWorkCall(value);
 }

Вопрос, который у меня возникает здесь, заключается в том, выполняю ли я эту операцию самим асинхронным способом? (Поскольку я использую функцию 'get' внутри фильтразаблокирует ли это выполнение и сделает его только последовательным?)

Ответы [ 3 ]

0 голосов
/ 07 декабря 2018

Когда вы немедленно вызываете get, вы действительно уничтожаете преимущество асинхронного выполнения.Решение состоит в том, чтобы сначала собрать все асинхронные задания, прежде чем присоединиться.

public List<Integer> afterFilteringList(List<Integer> initialList){
    Map<Integer,CompletableFuture<Boolean>> jobs = initialList.stream()
        .collect(Collectors.toMap(Function.identity(), this::makeNetworkCallAndCheck));
    return initialList.stream()
        .filter(element -> jobs.get(element).join())
        .collect(Collectors.toList());
}
public CompletableFuture<Boolean> makeNetworkCallAndCheck(Integer value){
   return CompletableFuture.supplyAsync(() -> resultOfNetWorkCall(value));
}

Конечно, метод makeNetworkCallAndCheck также должен инициировать действительно асинхронную операцию.Вызов метода синхронно и возврат completedFuture недостаточен.Я предоставил здесь простую примерную асинхронную операцию, но для операций ввода-вывода вы, вероятно, захотите указать свои собственные Executor, соответствующие количеству одновременных соединений, которые вы хотите разрешить.

0 голосов
/ 07 декабря 2018

Collection.parallelStream() - это простой способ сделать асинхронный материал для коллекции.Вы можете изменить свой код следующим образом:

public List<Integer> afterFilteringList(List<Integer> initialList){
    List<Integer> afterFilteringList =initialList
            .parallelStream()
            .filter(this::makeNetworkCallAndCheck)
            .collect(Collectors.toList());

    return afterFilteringList;
}
public Boolean makeNetworkCallAndCheck(Integer value){
    return resultOfNetWorkCall(value);
}

Вы можете настроить своего исполнителя, набрав таким образом .И порядок результата гарантирован в соответствии с this .

Я написал следующий код, чтобы проверить мои слова.

public class  DemoApplication {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(50);
        final List<Integer> integers = new ArrayList<>();
        for (int i = 0; i < 50; i++) {
            integers.add(i);
        }
        long before = System.currentTimeMillis();
        List<Integer> items = forkJoinPool.submit(() ->
                integers
                        .parallelStream()
                        .filter(it -> {
                            try {
                                Thread.sleep(10000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            return true;
                        })
                        .collect(Collectors.toList()))
                .get();
        long after = System.currentTimeMillis();
        System.out.println(after - before);
    }
}

Я создаю свой собственный ForkJoinPool и мне требуется 10019 миллисекунд, чтобы завершить 50 заданий параллельно, хотя каждое из них стоит 10000 миллисекунд.

0 голосов
/ 07 декабря 2018

Если вы используете get(), это не будет Async

get(): При необходимости ожидает завершения этого будущего, а затем возвращает его результат.

Если вы хотите обработать все запросы в Async.Вы можете использовать CompletetableFuture.allOf()

public List<Integer> filterList(List<Integer> initialList){
    List<Integer> filteredList = Collections.synchronizedList(new ArrayList());
    AtomicInteger atomicInteger = new AtomicInteger(0);
    CompletableFuture[] completableFutures = new CompletableFuture[initialList.size()];
    initialList.forEach(x->{
        completableFutures[atomicInteger.getAndIncrement()] = CompletableFuture
            .runAsync(()->{
                if(makeNetworkCallAndCheck(x)){
                    filteredList.add(x);
                }
        });
    });

    CompletableFuture.allOf(completableFutures).join();
    return filteredList;
}

private Boolean makeNetworkCallAndCheck(Integer value){
    // TODO: write the logic;
    return true;
}
...