DefaultWebClient
имеет exchange
, реализованный как:
@Override
public Mono<ClientResponse> exchange() {
ClientRequest request = (this.inserter != null ?
initRequestBuilder().body(this.inserter).build() :
initRequestBuilder().build());
return Mono.defer(() -> exchangeFunction.exchange(request)
.checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]")
.switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR));
}
Как вы можете видеть выше, вызов exchangeFunction.exchange
обернут в Mono.defer
, поэтому он будет выполняться всякий раз, когда что-то подписка на возвращенное Mono<ClientResponse>
.
Однако в моем очень конкретном c случае использования я не хочу повторно выполнять обмен, учитывая приведенный ниже упрощенный код:
final WebClient webClient = WebClient.create("http://some-base-url");
final AtomicReference<Mono<ClientResponse>> responseRef = new AtomicReference<>(null);
Flux.fromIterable(Arrays.asList(1, 2, 3))
.flatMap(num -> {
if (...some condition...) {
return responseRef.updateAndGet(response -> response == null
? webClient.get().uri("/some-path").exchange()
: response)
.flatMap(response -> {...do something with num and response...});
} else {
return Mono.just(...something...);
}
})
...
Как вы можете видеть выше в моем случае использования, я пытался использовать AtomicReference
, чтобы лениво получить Mono<ClientResponse>
, чтобы HTTP-запрос не повторялся снова и снова.
Это не работает должным образом поскольку подписавшаяся на Mono<ClientResponse>
, опубликованная exchange()
, сделавшая что-то с номером-н-и-ответом *1011* снова и снова вызовет свой внутренний exchangeFunction.exchange
.
Можно ли обернуть опубликованное Mono<ClientResponse>
с чем-то компенсировать эффект Mono.defer
? Или есть способ обойти его, не меняя структуру кода моего варианта использования?
========== Работоспособное решение ==========
Вдохновленный принятым ответом, я изменил свой код следующим образом:
final WebClient webClient = WebClient.create("http://some-base-url");
final AtomicReference<Mono<ClientResponse>> responseRef = new AtomicReference<>(null);
Flux.fromIterable(Arrays.asList(1, 2, 3))
.flatMap(num -> {
if (...some condition...) {
return responseRef.updateAndGet(response -> response == null
? webClient.get().uri("/some-path").exchange().cache()
: response)
.flatMap(response -> {...do something with num and response...});
} else {
return Mono.just(...something...);
}
})
...
Обратите внимание на cache()
после exchange()
. Кэш Mono
превращает его в горячий источник и кеширует последние излучаемые сигналы для дальнейших подписчиков. Завершение и ошибка также будут воспроизведены.