Запретить Spring WebFlux WebClient выполнять новый обмен при новой подписке - PullRequest
1 голос
/ 25 марта 2020

DefaultWebClient имеет exchange, реализованный как:

@Override
public Mono<ClientResponse> exchange() {
    ClientRequest request = (this.inserter != null ?
            initRequestBuilder().body(this.inserter).build() :
            initRequestBuilder().build());
    return Mono.defer(() -> exchangeFunction.exchange(request)
            .checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]")
            .switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR));
}

Как вы можете видеть выше, вызов exchangeFunction.exchange обернут в Mono.defer, поэтому он будет выполняться всякий раз, когда что-то подписка на возвращенное Mono<ClientResponse>.

Однако в моем очень конкретном c случае использования я не хочу повторно выполнять обмен, учитывая приведенный ниже упрощенный код:

final WebClient webClient = WebClient.create("http://some-base-url");
final AtomicReference<Mono<ClientResponse>> responseRef = new AtomicReference<>(null);
Flux.fromIterable(Arrays.asList(1, 2, 3))
    .flatMap(num -> {
        if (...some condition...) {
            return responseRef.updateAndGet(response -> response == null 
                                                        ? webClient.get().uri("/some-path").exchange()
                                                        : response)
                              .flatMap(response -> {...do something with num and response...});
        } else {
            return Mono.just(...something...);
        }
    })
    ...

Как вы можете видеть выше в моем случае использования, я пытался использовать AtomicReference, чтобы лениво получить Mono<ClientResponse>, чтобы HTTP-запрос не повторялся снова и снова.

Это не работает должным образом поскольку подписавшаяся на Mono<ClientResponse>, опубликованная exchange(), сделавшая что-то с номером-н-и-ответом *1011* снова и снова вызовет свой внутренний exchangeFunction.exchange.

Можно ли обернуть опубликованное Mono<ClientResponse> с чем-то компенсировать эффект Mono.defer? Или есть способ обойти его, не меняя структуру кода моего варианта использования?

========== Работоспособное решение ==========

Вдохновленный принятым ответом, я изменил свой код следующим образом:

final WebClient webClient = WebClient.create("http://some-base-url");
final AtomicReference<Mono<ClientResponse>> responseRef = new AtomicReference<>(null);
Flux.fromIterable(Arrays.asList(1, 2, 3))
    .flatMap(num -> {
        if (...some condition...) {
            return responseRef.updateAndGet(response -> response == null 
                                                        ? webClient.get().uri("/some-path").exchange().cache()
                                                        : response)
                              .flatMap(response -> {...do something with num and response...});
        } else {
            return Mono.just(...something...);
        }
    })
    ...

Обратите внимание на cache() после exchange(). Кэш Mono превращает его в горячий источник и кеширует последние излучаемые сигналы для дальнейших подписчиков. Завершение и ошибка также будут воспроизведены.

1 Ответ

1 голос
/ 25 марта 2020

Вы можете сделать что-то вроде этого:

final WebClient webClient = WebClient.create("http://localhost:8080");
Flux<String> data = webClient
                .get()
                .uri("test")
                .exchange()
                //do whatever you need on response
                .flatMap(clientResponse -> clientResponse.bodyToMono(String.class))
                .flux()
                //Turn this Flux into a hot source and cache last emitted signals for further Subscriber
                .replay()
                //Connects this ConnectableFlux to the upstream source when the first Subscriber subscribes.
                .autoConnect();

Flux.range(0, 10).flatMap(integer -> data).log().subscribe();

Вы можете сделать:

 Mono<String> data = webClient
                .get()
                .uri("test")
                .exchange()
                .flatMap(clientResponse -> clientResponse.bodyToMono(String.class))
                .cache();


 Flux.range(0, 10).flatMap(integer -> {
        if (integer % 2 == 0)
            return data;
        else
            return Mono.empty();
    }).log().subscribe();
...