Как дождаться завершения закрытого асинхронного блока - PullRequest
0 голосов
/ 18 июня 2020

У меня проблема с stream().forEach он не заканчивается sh вовремя до возврата метода, вот как:

Моя сущность:

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Foo {
    private String a;
    private int b;
    private int c;
    private String d;
    private String e;
}

I есть метод, который вызывает внешнюю службу, которая получает список Foo, а затем для каждого члена этого списка вызывает две другие внешние службы для заполнения полей d и e:

public List<Foo> getOrdresListe() {
    Foo[] fooArray = externalServiceOne.getFooList();
    Arrays.stream(fooArray).forEach((f) -> {
        CompletableFuture.supplyAsync(AsyncUtils.supplierWithCustomDependencies(() -> {
            Dob dob = externalServiceTwo.getDeeEntity(f.getA());
            f.setD(dob.getD());
            Efo efo = externalServiceThree.getEeeEntity(f.getA());
            f.setE(efo.getE());
            return f;
        }));
    });
    List<Foo> fooList = Arrays.asList(fooArray);
    return fooList; // when this statement is reached d and e fields are null.
}

Из-за некоторых проблем с производительностью (и некоторых передовых методов) я вызываю externalServiceTwo.getDeeEntity и externalServiceThree.getEeeEntity асинхронно с настраиваемым поставщиком, чтобы установить некоторые зависимости при вызове служб. Но основная проблема заключается в том, что при возврате полей fooList d и e нулевые.

Мой вопрос в том, как дождаться всех выполнений asyn c fo fini sh перед возвратом fooList?

Ответы [ 3 ]

1 голос
/ 18 июня 2020

Вы можете просто запомнить асинхронные c задачи и дождаться их завершения вручную:

public List<Foo> getOrdresListe() {
    Foo[] fooArray = externalServiceOne.getFooList();

    final List<CompletableFuture<Foo>> futures = new ArrayList<>(); // remember each

    Arrays.stream(fooArray).forEach((f) -> {
        futures.add(CompletableFuture.supplyAsync(
                AsyncUtils.supplierWithCustomDependencies(() -> {
            Dob dob = externalServiceTwo.getDeeEntity(f.getA());
            f.setD(dob.getD());
            Efo efo = externalServiceThree.getEeeEntity(f.getA());
            f.setE(efo.getE());
            return f;
        })));
    });

    for (CompletableFuture<Foo> future : futures) {
        future.join(); // wait for each
    }

    List<Foo> fooList = Arrays.asList(fooArray);
    return fooList;
}
1 голос
/ 29 июня 2020

Можно попробовать что-то вроде этого: отправить запрос Asyn c и создать список завершаемых фьючерсов типа Foo, а затем использовать join в качестве терминальной операции, как предложил @boobalan; вы получаете результаты таким образом, вы можете дождаться всех выполнений asyn c fo fini sh, прежде чем возвращать fooList.


List<foo> fooData = externalServiceOne.getFooList();

//Sends the request asynchronously and returns the Completable Future of type Foo which can be later used to get the results using `get` or `join`
private List<CompletableFuture<Foo>> sendRequest() {
    return fooData.stream().map(this::computeAsync).collect(Collectors.toList());
}

private CompletableFuture<Foo> computeAsync(Foo f) {
    return CompletableFuture.supplyAsync(() -> 
CompletableFuture.supplyAsync(AsyncUtils.supplierWithCustomDependencies(() -> {
                Dob dob = externalServiceTwo.getDeeEntity(f.getA());
                f.setD(dob.getD());
                Efo efo = externalServiceThree.getEeeEntity(f.getA());
                f.setE(efo.getE());
                return f 
           });

private List<Foo> processResponse() {
   List<CompletableFuture<Foo>> futureResult = sendRequest();

   List<Foo> fooList = futureResult.stream()
            .map(CompletableFuture::join)
            .filter(Objects::nonNull)
            .collect(Collectors.toList());
     return fooList;
    }

}
1 голос
/ 18 июня 2020

Почему бы вам не создать все CompletableFuture s в качестве промежуточной операции и не дождаться, пока все asyn c выполнения завершатся sh в терминальной операции.

Arrays.stream(fooArray).map((f) -> CompletableFuture.supplyAsync(AsyncUtils.supplierWithCustomDependencies(() -> {
                Dob dob = externalServiceTwo.getDeeEntity(f.getA());
                f.setD(dob.getD());
                Efo efo = externalServiceThree.getEeeEntity(f.getA());
                f.setE(efo.getE());
                return f;
            })
    )).forEach(fooCompletableFuture -> {
        try {
            fooCompletableFuture.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    });
...