Spring WebClient: повторите попытку с WebFlux.fn + реактор-аддоны - PullRequest
0 голосов
/ 24 марта 2020

Я пытаюсь добавить условную повторную попытку для WebClient с Kotlin Coroutines + WebFlux.fn + реакторные дополнения:

suspend fun ClientResponse.asResponse(): ServerResponse =
    status(statusCode())
        .headers { headerConsumer -> headerConsumer.addAll(headers().asHttpHeaders()) }
        .body(bodyToMono(DataBuffer::class.java), DataBuffer::class.java)
        .retryWhen { 
            Retry.onlyIf { ctx: RetryContext<Throwable> -> (ctx.exception() as? WebClientResponseException)?.statusCode in retryableErrorCodes }
                .exponentialBackoff(ofSeconds(1), ofSeconds(5))
                .retryMax(3)
                .doOnRetry { log.error("Retry for {}", it.exception()) }
        )
        .awaitSingle()

, также добавляя условие перед повторной попыткой

if (statusCode().isError) {
        body(
            BodyInserters.fromPublisher(
                Mono.error(StatusCodeError(status = statusCode())),
                StatusCodeException::class.java
            )
        )
    } else {
        body(bodyToMono(DataBuffer::class.java), DataBuffer::class.java)
    }

Звонок выглядит так:

suspend fun request(): ServerResponse =
           webClient/*...*/
                    .awaitExchange()
                    .asResponse()

1 Ответ

0 голосов
/ 27 марта 2020

Этот весенний веб-клиент: повторите попытку с заданным значением c ошибка дал мне подсказку, чтобы ответить на вопрос:

.awaitExchange() возвращает ClientResponse, а не Mono<ClientReponse> Это означает, что моя повторная попытка действовала на bodyToMono вместо операции exchange().

Решение теперь выглядит как

suspend fun Mono<ClientResponse>.asResponse(): ServerResponse =
    flatMap {
        if (it.statusCode().isError) {
            Mono.error(StatusCodeException(status = it.statusCode()))
        } else {
            it.asResponse()
        }
    }.retryWhen(
        Retry.onlyIf { ctx: RetryContext<Throwable> ->
            (ctx.exception() as? StatusCodeException)?.shouldRetry() ?: false
        }
            .exponentialBackoff(ofSeconds(1), ofSeconds(5))
            .retryMax(3)
            .doOnRetry { log.error { it.exception() } }
    ).awaitSingle()

private fun ClientResponse.asResponse(): Mono<ServerResponse> =
    status(statusCode())
        .headers { headerConsumer -> headerConsumer.addAll(headers().asHttpHeaders()) }
        .body(bodyToMono(DataBuffer::class.java), DataBuffer::class.java)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...