Как отложить повторный запрос на получение WebClient - PullRequest
0 голосов
/ 08 февраля 2019

Я использую Spring 5 WebClient для многократного извлечения некоторого состояния работающего процесса из API REST.

С помощью здесь Я сейчас пришел к этому решению:

webClient.get().uri(...).retrieve.bodyToMono(State.class)
          .repeat()
          .skipUntil(state -> stateFinished())
          .limitRequest(1)
          .subscribe(state -> {...});

Пока это работает, запрос get запускается с очень высокой скоростью.Как правильно ограничить частоту запросов, скажем, 1 запросом в секунду?

Я пытался использовать delayElements(Duration.ofSeconds(1)), но это только задерживает результаты, а не сам запрос.

Ответы [ 4 ]

0 голосов
/ 11 февраля 2019

То, что вы используете delayElements, говорит мне, что вы ставите его после повтора.Что вы хотите отложить - это подписка на WebClient.

webClient
      .get()
      .uri(...)
      .retrieve
      .bodyToMono(State.class)
      .delaySubscription(Duration.ofSeconds(1)) //Just add this before the repeat
      .repeat()
      .skipUntil(state -> stateFinished())
      .limitRequest(1)
      .subscribe(state -> {...});

Это гарантирует, что между ответом n-го запроса и инициированием n + 1-го запроса есть секунда.Если вам нужна фиксированная частота вызовов независимо от времени, затрачиваемого на каждый запрос ответа, оберните ваш код в Flux.interval, как предлагает Роман.

0 голосов
/ 09 февраля 2019

Вы можете использовать оператор repeatWhen с пользовательской реализацией компаньона Publisher

Mono.just("test")
        .repeatWhen(longFlux -> Flux.interval(Duration.ofSeconds(1)))
        .take(5)
        .log()
        .blockLast();

или с функцией Repeate из реактора-аддона

Mono.just("test")
        .repeatWhen(Repeat.times(Long.MAX_VALUE)
                .fixedBackoff(Duration.ofSeconds(1)))
        .take(5)
        .log()
        .blockLast();
0 голосов
/ 10 февраля 2019

Альтернативное решение вашей проблемы

Flux.interval(Duration.ZERO, Duration.ofSeconds(1))
        .onBackpressureDrop()
        .concatMap(i -> webClientCall(...), 1)
        //or flatMap() if you want send request each second
        .filter(state -> stateFinished(state))
        .next()
        .timeout(Duration.ofSeconds(...))
        //
        .subscribe(state -> {...});

Но помните, что если вы подпишетесь сами (не на Spring), то контекст подписчика реактора не будет распространен на ваш запрос (без контекста безопасности, спящего и т. Д.)...)

0 голосов
/ 09 февраля 2019

В вашем случае есть еще один маленький обходной путь, когда вы упаковываете каждый свой звонок с помощью Flux, используемого в качестве ограничителя.

.zipWith(Flux.interval(Duration.of(1, ChronoUnit.SECONDS)))

Хотя я бы подумал, что delayElements() работает, возможно, вы не сделалиположить его на правильную стадию вашего стека Webclient.

...