Spring WebFlux Web Client - Итерация постраничного API REST - PullRequest
0 голосов
/ 13 мая 2018

Мне нужно получить элементы со всех страниц API REST с возможностью просмотра страниц.Мне также нужно начать обработку элементов, как только они будут доступны, не нужно ждать загрузки всех страниц.Для этого я использую Spring WebFlux и его WebClient и хочу вернуть Flux<Item>.Кроме того, используемый мной REST API ограничен по скорости, и каждый ответ на него содержит заголовки с подробной информацией о текущих пределах:

  • Размер текущего окна
  • Оставшееся время втекущее окно
  • Запрос квоты в окне
  • Запросы, оставленные в текущем окне

Ответ на запрос одной страницы выглядит следующим образом:

{
    "data": [],
    "meta": {
      "pagination": {
        "total": 10,
        "current": 1
      }
    }
}

Массив данных содержит фактические элементы, в то время как метаобъект содержит информацию о разбиении на страницы.

Мое текущее решение сначала выполняет "фиктивный" запрос, просто чтобы получить общее количество страниц и ограничения скорости.

Mono<T> paginated = client.get()
    .uri(uri)
    .exchange()
    .flatMap(response -> {                  
        HttpHeaders headers = response.headers().asHttpHeaders();

        Limits limits = new Limits();
        limits.setWindowSize(headers.getFirst("X-Window-Size"));
        limits.setWindowRemaining(headers.getFirst("X-Window-Remaining"));
        limits.setRequestsQuota(headers.getFirst("X-Requests-Quota");
        limits.setRequestsLeft(headers.getFirst("X-Requests-Remaining");

        return response.bodyToMono(Paginated.class)
                .map(paginated -> { 
                    paginated.setLimits(limits);
                    return paginated;
                });
    });

После этого я посылаю поток, содержащий номера страниц, и для каждой страницы я делаю запрос REST API, каждый запрос достаточно задерживается, чтобы он не превысил предел, и возвращаю потокизвлеченные элементы:

return paginated.flatMapMany(paginated -> {
    return Flux.range(1, paginated.getMeta().getPagination().getTotal())
            .delayElements(Duration.ofMillis(paginated.getLimits().getWindowRemaining() / paginated.getLimits().getRequestsQuota()))
            .flatMap(page -> {
                return client.get()
                        .uri(pageUri)
                        .retrieve()
                        .bodyToMono(Item.class)
                        .flatMapMany(p -> Flux.fromIterable(p.getData()));
            });
});

Это работает, но я не доволен этим, потому что:

  • Это делает первоначальный "фиктивный" запрос для получения количества страниц, изатем повторяет тот же запросt для получения фактических данных.
  • Он получает ограничения скорости только при первоначальном запросе и предполагает, что ограничения не изменятся (например, это единственный, использующий API) - что может быть неверно,в этом случае он получит ошибку, превышающую лимит.

Поэтому мой вопрос заключается в том, как выполнить его рефакторинг, чтобы он не нуждался в первоначальном запросе (а скорее получить ограничения, номера страниц и данныес первого запроса и продолжите работу на всех страницах, обновляя (и соблюдая) ограничения.

1 Ответ

0 голосов
/ 23 ноября 2018

Я думаю, этот код будет делать то, что вы хотите.Идея состоит в том, чтобы создать поток, который выполняет вызов на ваш ресурсный сервер, но в процессе обработки ответа, чтобы добавить новое событие в этот поток, чтобы иметь возможность выполнить вызов на следующей странице.

Код состоит из:

Простой оболочки, содержащей следующую вызываемую страницу и задержку ожидания перед выполнением вызова.

private class WaitAndNext{
    private String next;
    private long delay;
}

FluxProcessor, который выполняет HTTP-вызов и обрабатываетответ:

FluxProcessor<WaitAndNext, WaitAndNext> processor= DirectProcessor.<WaitAndNext>create();
FluxSink<WaitAndNext> sink=processor.sink();

processor
    .flatMap(x-> Mono.just(x).delayElement(Duration.ofMillis(x.delay)))
    .map(x-> WebClient.builder()
    .baseUrl(x.next)
    .defaultHeader("Accept","application/json")
    .build())
    .flatMap(x->x.get()        
                 .exchange()
                 .flatMapMany(z->manageResponse(sink, z))
            )
    .subscribe(........);

Я разделил код с помощью метода, который только управляет ответом: он просто разворачивает ваши данные и добавляет новое событие в приемник (событие, вызываемое на следующей странице после указанной задержки)

private Flux<Data> manageResponse(FluxSink<WaitAndNext> sink, ClientResponse resp) {

    if (resp.statusCode()!= HttpStatus.OK){
        sink.error(new IllegalStateException("Status code invalid"));
    }

    WaitAndNext wn=new WaitAndNext();
    HttpHeaders headers=resp.headers().asHttpHeaders();
    wn.delay= Integer.parseInt(headers.getFirst("X-Window-Remaining"))/ Integer.parseInt(headers.getFirst("X-Requests-Quota"));

    return resp.bodyToMono(Item.class)
        .flatMapMany(p -> {
            if (p.paginated.current==p.paginated.total){
                sink.complete();
            }else{
                wn.next="https://....?page="+(p.paginated.current+1);
                sink.next(wn);
            }
            return Flux.fromIterable(p.getData());
        });
}

Теперь нам просто нужно инициализировать систему, вызвав поиск первой страницы без задержки:

WaitAndNext wn=new WaitAndNext();
wn.next="https://....?page=1";
wn.delay=0;
sink.next(wn);
...