Как собрать результаты последовательного вызова асинхронного API? - PullRequest
0 голосов
/ 07 сентября 2018

У меня есть асинхронный API, который по существу возвращает результаты через нумерацию страниц

public CompletableFuture<Response> getNext(int startFrom);

Каждый объект Response содержит список смещений от startFrom и флаг, указывающий, остались ли еще элементы, и, следовательно, еще один getNext() запрос на выполнение.

Я хотел бы написать метод, который просматривает все страницы и извлекает все смещения. Я могу написать это синхронно, вот так

int startFrom = 0;
List<Integer> offsets = new ArrayList<>();

for (;;) {
    CompletableFuture<Response> future = getNext(startFrom);
    Response response = future.get(); // an exception stops everything
    if (response.getOffsets().isEmpty()) {
        break; // we're done
    }
    offsets.addAll(response.getOffsets());
    if (!response.hasMore()) {
        break; // we're done
    }
    startFrom = getLast(response.getOffsets());
}

Другими словами, мы вызываем getNext() с startFrom в 0. Если выдается исключение, мы закорачиваем весь процесс. В противном случае, если нет смещений, мы завершаем. Если есть смещения, мы добавляем их в основной список. Если больше не осталось извлечь, мы завершаем. В противном случае мы сбрасываем startFrom до последнего смещения, которое мы получили, и повторяем.

В идеале я хочу сделать это без блокировки с помощью CompletableFuture::get() и возврата CompletableFuture<List<Integer>>, содержащего все смещения.

Как я могу это сделать? Как я могу составить фьючерсы, чтобы собрать их результаты?


Я думаю о "рекурсивном" (на самом деле не в исполнении, а в коде)

private CompletableFuture<List<Integer>> recur(int startFrom, List<Integer> offsets) {
    CompletableFuture<Response> future = getNext(startFrom);
    return future.thenCompose((response) -> {
        if (response.getOffsets().isEmpty()) {
            return CompletableFuture.completedFuture(offsets);
        }
        offsets.addAll(response.getOffsets());
        if (!response.hasMore()) {
            return CompletableFuture.completedFuture(offsets);
        }
        return recur(getLast(response.getOffsets()), offsets);
    });
}

public CompletableFuture<List<Integer>> getAll() {
    List<Integer> offsets = new ArrayList<>();
    return recur(0, offsets);
}

Мне это не нравится, с точки зрения сложности. Можем ли мы сделать лучше?

Ответы [ 2 ]

0 голосов
/ 13 сентября 2018

Я также хотел бы попробовать на этом EA Async , так как он реализует поддержку Java для async / await ( на основе C # ). Поэтому я просто взял ваш исходный код и преобразовал его:

public CompletableFuture<List<Integer>> getAllEaAsync() {
    int startFrom = 0;
    List<Integer> offsets = new ArrayList<>();

    for (;;) {
        // this is the only thing I changed!
        Response response = Async.await(getNext(startFrom));
        if (response.getOffsets().isEmpty()) {
            break; // we're done
        }
        offsets.addAll(response.getOffsets());
        if (!response.hasMore()) {
            break; // we're done
        }
        startFrom = getLast(response.getOffsets());
    }

    // well, you also have to wrap your result in a future to make it compilable
    return CompletableFuture.completedFuture(offsets);
}

Затем вам нужно обработать ваш код , например, добавив

Async.init();

в начале вашего main() метода.

Я должен сказать: это действительно похоже на волшебство!

За кулисами EA Async замечает, что внутри метода есть вызов Async.await(), и переписывает его для обработки всей рекурсии thenCompose() / thenApply() /. Единственное требование - ваш метод должен возвращать CompletionStage или CompletableFuture.

Это действительно Асинхронный код стал проще !

0 голосов
/ 13 сентября 2018

Для этого упражнения я сделал обобщенную версию этого алгоритма, но он довольно сложный, потому что вам необходимо:

  1. начальное значение для вызова службы (startFrom)
  2. сам сервисный вызов (getNext())
  3. контейнер результатов для накопления промежуточных значений (offsets)
  4. аккумулятор (offsets.addAll(response.getOffsets()))
  5. условие для выполнения «рекурсии» (response.hasMore())
  6. функция для вычисления следующего ввода (getLast(response.getOffsets()))

, так что это дает:

public <T, I, R> CompletableFuture<R> recur(T initialInput, R resultContainer,
        Function<T, CompletableFuture<I>> service,
        BiConsumer<R, I> accumulator,
        Predicate<I> continueRecursion,
        Function<I, T> nextInput) {
    return service.apply(initialInput)
            .thenCompose(response -> {
                accumulator.accept(resultContainer, response);
                if (continueRecursion.test(response)) {
                    return recur(nextInput.apply(response),
                            resultContainer, service, accumulator,
                            continueRecursion, nextInput);
                } else {
                    return CompletableFuture.completedFuture(resultContainer);
                }
            });
}

public CompletableFuture<List<Integer>> getAll() {
    return recur(0, new ArrayList<>(), this::getNext,
            (list, response) -> list.addAll(response.getOffsets()),
            Response::hasMore,
            r -> getLast(r.getOffsets()));
}

Небольшое упрощение recur() возможно, заменив initialInput на CompletableFuture, возвращаемый результатом первого вызова, resultContainer и accumulator можно объединить в один Consumerи service можно затем объединить с функцией nextInput.

Но это дает немного более сложный getAll():

private <I> CompletableFuture<Void> recur(CompletableFuture<I> future,
        Consumer<I> accumulator,
        Predicate<I> continueRecursion,
        Function<I, CompletableFuture<I>> service) {
    return future.thenCompose(result -> {
        accumulator.accept(result);
        if (continueRecursion.test(result)) {
            return recur(service.apply(result), accumulator, continueRecursion, service);
        } else {
            return CompletableFuture.completedFuture(null);
        }
    });
}

public CompletableFuture<List<Integer>> getAll() {
    ArrayList<Integer> resultContainer = new ArrayList<>();
    return recur(getNext(0),
            result -> resultContainer.addAll(result.getOffsets()),
            Response::hasMore,
            r -> getNext(getLast(r.getOffsets())))
            .thenApply(unused -> resultContainer);
}
...