Flux collectList () в списке бирж WebClient всегда пусто - PullRequest
0 голосов
/ 27 апреля 2020

Я пытаюсь выполнить запросы списка, используя WebClient, затем отфильтровать их, найдя первый успешный (если есть), и вернуть его. Или вернитесь к ответу по умолчанию, если не удалось.

Проблема, с которой я сталкиваюсь, заключается в том, что когда я вызываю .collectList() на Flux<ServerResponse>, список всегда пуст. Я ожидал бы, что список будет содержать N число ServerResponse на основе количества запросов, которые я сделал ранее.

public Mono<ServerResponse> retry(ServerRequest request) {
    return Flux.fromIterable(request.headers().header(SEQUENCE_HEADER_NAME))
            .map(URI::create)
            // Build a "list" of responses
            .flatMap(uri -> webClientBuilder.baseUrl(uri.toString()).build()
                    .method(Objects.requireNonNull(request.method()))
                    .headers(headers -> request.headers().asHttpHeaders().forEach((key, values) -> {
                        if (!SEQUENCE_HEADER_NAME.equals(key)) {
                            headers.addAll(key, values);
                        }
                    }))
                    .body(BodyInserters.fromDataBuffers(request.body(BodyExtractors.toDataBuffers())))
                    .exchange()
                    .flatMap(clientResponse -> ServerResponse.status(clientResponse.statusCode())
                            .headers(headers -> headers.addAll(clientResponse.headers().asHttpHeaders()))
                            .body(BodyInserters.fromDataBuffers(clientResponse.body(BodyExtractors.toDataBuffers()))))
            )
            // "Wait" for all of them to complete so we can filter
            .collectList()
            .flatMap(clientResponses -> {
                List<ServerResponse> filteredResponses = clientResponses.stream()
                        .filter(response -> response.statusCode().is2xxSuccessful())
                        .collect(Collectors.toList());

                if (filteredResponses.isEmpty()) {
                    log.error("No request succeeded; defaulting to {}", HttpStatus.BAD_REQUEST.toString());
                    return ServerResponse.badRequest().build();
                }

                if (filteredResponses.size() > 1) {
                    log.error("Multiple requests succeeded; defaulting to {}", HttpStatus.BAD_REQUEST.toString());
                    return ServerResponse.badRequest().build();
                }

                return Mono.just(filteredResponses.get(0));
            });
}

Любые идеи, почему .collectList() всегда возвращает пустой список

1 Ответ

0 голосов
/ 29 апреля 2020

Что ж, мне кажется, у вас запутанное требование: вы хотите First Mono, который отвечает, но вы пытаетесь поместить эту функциональность в Flux, который предназначен для обработки всех элементов в потоке эффективно. Mono в Webflux предназначен для создания потока, который будет эффективно выполнять серию преобразований элемента в потоке. Ничего из того, что вам нужно для тестирования множества URI для первого, который успешно работает, не является подходящим для WebFlux, поэтому я должен задать вопрос, зачем пытаться внедрить это в каркас.

Вы можете утверждать, что Flux дает вам лучшую асинхронную обработку, но я не думаю, что это тот случай, когда это набор WebClient вызовов. WebClient - это все еще скрытый HTTP, поэтому каждый элемент в потоке останавливается и начинается около WebClient. Если вы хотите сделать HTTP асинхронно, вы должны использовать ThreadPool и Callable.

...