Параллельный запрос GET для определения c сопоставления с WebFlux - PullRequest
0 голосов
/ 29 февраля 2020

Я хочу вызвать независимый запрос одновременно с WebClient. Мой предыдущий подход с RestTemplate блокировал мои темы во время ожидания ответа. Итак, я понял, что WebClient с ParallelFlux может использовать один поток более эффективно, потому что он должен планировать несколько запросов с одним потоком.

Моя конечная точка запрашивает tupel из id и location.

Метод fooFlux будет вызываться несколько тысяч раз в al oop с различными параметрами. Возвращенная карта будет применена к сохраненным ссылочным значениям.

Предыдущие попытки этого кода привели к дублированию вызовов API. Но есть еще недостаток. Размер набора mapping часто меньше размера Set<String> location. На самом деле размер результирующей карты меняется. Кроме того, это правильно время от времени. Поэтому может возникнуть проблема с завершением индекса после того, как метод вернул карту.

public Map<String, ServiceDescription> fooFlux(String id, Set<String> locations) {
    Map<String, ServiceDescription> mapping = new HashMap<>();
    Flux.fromIterable(locations).parallel().runOn(Schedulers.boundedElastic()).flatMap(location -> {
        Mono<ServiceDescription> sdMono = getServiceDescription(id, location);
        Mono<Mono<ServiceDescription>> sdMonoMono = sdMono.flatMap(item -> {
            mapping.put(location, item);
            return Mono.just(sdMono);
        });
        return sdMonoMono;
    }).then().block();
    LOGGER.debug("Input Location size: {}", locations.size());
    LOGGER.debug("Output Location in map: {}", mapping.keySet().size());
    return mapping;
}

Обработка запроса Get

private Mono<ServiceDescription> getServiceDescription(String id, String location) {
    String uri = URL_BASE.concat(location).concat("/detail?q=").concat(id);
    Mono<ServiceDescription> serviceDescription =
                    webClient.get().uri(uri).retrieve().onStatus(HttpStatus::isError, clientResponse -> {
                        LOGGER.error("Error while calling endpoint {} with status code {}", uri,
                                        clientResponse.statusCode());
                        throw new RuntimeException("Error while calling Endpoint");
                    }).bodyToMono(ServiceDescription.class).retryBackoff(5, Duration.ofSeconds(15));
    return serviceDescription;
}

Ответы [ 2 ]

1 голос
/ 02 марта 2020
public Map<String, ServiceDescription> fooFlux(String id, Set<String> locations) {
    return Flux.fromIterable(locations)
               .flatMap(location -> getServiceDescription(id, location).map(sd -> Tuples.of(location, sd)))
               .collectMap(Tuple2::getT1, Tuple2::getT2)
               .block();
}

Примечание: оператор flatMap в сочетании с вызовом WebClient обеспечивает одновременное выполнение, поэтому нет необходимости использовать ParallelFlux или любой Scheduler.

1 голос
/ 29 февраля 2020

Реактивный код выполняется, когда вы подписываетесь на производителя. Блок подписывается, и так как вы дважды вызываете блок (один раз для Mono, но снова возвращаете Mono, а затем вызываете блок для ParallelFlux), Mono выполняется дважды.

    List<String> resultList = listMono.block();
    mapping.put(location, resultList);
    return listMono;

Попробуйте вместо этого что-то вроде следующего (не проверено):

    listMono.map(resultList -> {
       mapping.put(location, resultList);
       return Mono.just(listMono);
    });

Тем не менее, модель Reactive Programming довольно сложна, поэтому рассмотрите возможность работы с @Async и Future / AsyncResult вместо этого, если речь идет только о вызове удаленного звоните параллельно, как предлагали другие. Вы по-прежнему можете использовать WebClient (RestTemplate, кажется, находится на пути к устареванию), но просто вызовите блок сразу после bodyToMono.

...