рекурсивный вызов API с помощью WebClient и Reactor 3.0 - PullRequest
0 голосов
/ 24 марта 2019

Наконец-то я изучаю программирование в функциональном стиле с помощью Reactor. Так что я новичок в этом.

Первое, что я хочу сделать, это вызвать внешний API с помощью WebClient. Этот вызов должен быть рекурсивным, так как ответ предоставляет следующее значение параметра вызова, и мне нужно использовать его при следующем вызове, пока не будет достигнут тривиальный случай.

Вот что придумали:

    Flux.from(p -> queryUntilNow())
            .flatMap(res -> // res is object )
            .subscribe( // process )



private Flux<ApiResp> queryUntilNow() {
    return Flux.from(p -> {
        queryAPI(since)
                .doOnError(System.out::println)
                .subscribe(apiResp -> {
                    if (since == apiResp.last) return;

                    since = apiResp.last;
                    queryUntilNow();
                });
    });
}

private Flux<ApiResp> queryAPI(int last) {
    Flux<ApiResp> resp = kapi.get()
            .uri("/OHLC?pair={pair}&since={since}&interval={int}", pair, last, interval)
            .retrieve()
            .bodyToFlux(ApiResp.class);

    return resp;
}

Похоже, мне нужно немного больше подстроить свое мышление под этот стиль программирования, поэтому, пожалуйста, дайте мне несколько примеров и объяснений.

Спасибо!

1 Ответ

0 голосов
/ 24 марта 2019

Если вам просто нужно зациклить линейные результаты, которые возвращаются пакетами (в отличие от повторяющегося дерева), вы можете использовать повторяющийся поток, чья начальная точка меняется при каждом повторении.

Вот полный пример, который просто имитирует вызов API. Вы можете заменить в своем вызове WebClient в queryFrom:

public class Main {

    private static class ApiResp {
        private final int last;
        private ApiResp(int last) {
            this.last = last;
        }
    }

    public static void main(String[] args) {
        queryBetween(0, 15)
                .doOnNext(apiResp -> System.out.println(apiResp.last))
                .blockLast();
    }

    public static Flux<ApiResp> queryBetween(int startInclusive, int endExclusive) {
        // The starting point of the next iteration
        final AtomicReference<Integer> nextIterationStart = new AtomicReference<>(startInclusive);
        return Flux
                // defer will cause a new Flux with a new starting point to be created for each subscription
                .defer(() -> queryFrom(nextIterationStart.get()))
                // update the starting point of the next iteration
                .doOnNext(apiResp -> nextIterationStart.set(apiResp.last + 1))
                // repeat with a new subscription if we haven't reached the end yet
                .repeat(() -> nextIterationStart.get() < endExclusive)
                // make sure we didn't go past the end if queryFrom returned more results than we need
                .takeWhile(apiResp -> apiResp.last < endExclusive);
    }

    public static Flux<ApiResp> queryFrom(int start) {
        // simulates an api call that always returns 10 results from the starting point
        return Flux.range(start, 10)
                .map(ApiResp::new);
    }
}
...