Генерация Spring Flux из последовательности вызовов в сети - PullRequest
2 голосов
/ 26 июня 2019

Я вызываю API api.magicthegathering.io/v1/cards, используя реактивный клиент Spring WebFlux. Ответ представляет собой страницу из 100 карточек вместе с заголовками, содержащими ссылки для «следующей» и «последней» страниц, например, «last» - это api.magicthegathering.io/v1/cards?page=426 (а «next» - просто n + 1). Я хочу сгенерировать Flux<Card>, который подает каждую карту отдельно, с одной точкой входа, например Flux<Card> getAllCards().

В настоящее время у меня есть компонент CardsClient, который возвращает Mono<CardPage>. У CardPage есть метод cards(), который возвращает все карточки (это представление модели ответа API 1: 1). Кроме того, у меня есть компонент CardCatalog с этим методом getAllCards().

Я пытался использовать Flux::expand и Flux::generate, что несколько работает, но у этих реализаций есть недостатки.

Вот фрагмент моей текущей итерации CardCatalog::getAllCards(). Проблема в том, что рекурсивный характер expand вызывает избыточные вызовы к client::getNextPage; очевидно, я не использую правильный метод.

  @Override
  public Flux<Card> getAllCards() {
    return client.getFirstPage().flux().expand(client::getNextPage)
        .map(Page::cards)
        .flatMap(Flux::fromIterable)
        .map(mapper::convert)
        .cache();
  }

Ранее я использовал generate, но проблема в том, что он всегда будет захватывать все страницы (довольно медленно), даже если подписчик решит только take(20) карточек:

 @Override
  public Flux<Card> getAllCards() {
    final Flux<Page> pageFlux =
        generate(client::getFirstPage, (response, sink) -> {
          final var page = response.block();
          sink.next(page);

          if (page.next().isPresent()) {
            return client.getNextPage(page);
          }
          sink.complete();
          return null;
        });

    return pageFlux.flatMapIterable(Page::cards).map(mapper::convert);
  }

Полный код здесь: https://github.com/myersadamk/mtg-api-client

Используя expand, я добавил отпечаток к client::getNextPage(). Как видите, график создает избыточные вызовы.

Getting https://api.magicthegathering.io/v1/cards?page=1
Getting https://api.magicthegathering.io/v1/cards?page=7
Getting https://api.magicthegathering.io/v1/cards?page=2
Getting https://api.magicthegathering.io/v1/cards?page=8
Getting https://api.magicthegathering.io/v1/cards?page=3
Getting https://api.magicthegathering.io/v1/cards?page=9
Getting https://api.magicthegathering.io/v1/cards?page=4
Getting https://api.magicthegathering.io/v1/cards?page=10
Getting https://api.magicthegathering.io/v1/cards?page=5
Getting https://api.magicthegathering.io/v1/cards?page=11
Getting https://api.magicthegathering.io/v1/cards?page=6
Getting https://api.magicthegathering.io/v1/cards?page=12
Getting https://api.magicthegathering.io/v1/cards?page=7

Я хочу что-то еще подобное:

Getting https://api.magicthegathering.io/v1/cards?page=1
Getting https://api.magicthegathering.io/v1/cards?page=2
Getting https://api.magicthegathering.io/v1/cards?page=3

(Заключительное примечание: конечно, было бы быстрее распараллелить это и вызвать URI напрямую, но немного глупо обходить следующий / последний механизм и жестко кодировать URI. Я могу в конечном итоге сделать это, но все же хочу взломать этот орех.)

1 Ответ

1 голос
/ 26 июня 2019

Хорошо, я придумала что-то, что работает.Я решил использовать подход с подсчетом страниц, чтобы попробовать распараллеливание, хотя, по общему признанию, это не так быстро, поскольку сетевой ввод-вывод остается узким местом.Я, вероятно, вернусь к сканированию ссылки заголовка и использую кеширование.Грубо говоря, магические числа и все, вот как это выглядит:

  @Override
  public Flux<Card> getAllCards() {
    return client.getPageCount().flatMapMany(pageCount ->
        Flux.concat(
            range(1, pageCount)
                .parallel(pageCount / 6).runOn(Schedulers.parallel())
                .map(client::getPage)
        ).map(Page::cards).flatMap(Flux::fromIterable).map(mapper::convert)
    );
  }
...