Поддержка последующих запросов, которые пополняются медленнее, чем тики? - PullRequest
0 голосов
/ 14 марта 2019

Приложение загрузки My Spring 5.2 использует Spring WebClient для проверки производительности REST API. Каждый API вызывается n раз в «фиксированном» interval цикле:

AtomicLong rec = new AtomicLong();

for (long sendAt = System.nanoTime(), sent = 0; rec.get() < n;) {
    if (sent < n && System.nanoTime() >= sendAt) {
        webClient
            .post()
            .uri(uri)
            .accept(ACCEPT_TYPE)
            .header(SOME_HEADER, someHeaderValue())
            .body(BodyInserters.fromObject(obj)
            .exchange()
            .doOnSuccess(response -> rec.incrementAndGet())
            .subscribe()
            ;

        sendAt += interval;

        sent++;
    }
}

Это отлично работает. Однако написание эквивалента в декларативном / реактивном стиле меня уклоняет. Наивно складывается в петле и условие:

webClient
    .post()
    .uri(uri)
    .accept(ACCEPT_TYPE)
    .header(SOME_HEADER, someHeaderValue())
    .body(BodyInserters.fromObject(obj)
    .exchange()
    .repeatWhen(f -> Flux.interval(Duration.ofNanos(interval)))
    .take(n)
    .blockLast()
    ;

терпит неудачу, когда клиент не может идти в ногу:

Could not emit tick 73685 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)

Итак, что является правильным / реактивным способом достижения того, что делает императивный цикл?

1 Ответ

0 голосов
/ 15 марта 2019

Из комментария Дэвида Карнока;с его расширениями RxJava2:

<dependency>
    <groupId>com.github.akarnokd</groupId>
    <artifactId>rxjava2-extensions</artifactId>
    <version>0.20.8</version>
</dependency>

вы можете сделать:

webClient
    .post()
    .uri(uri)
    .accept(ACCEPT_TYPE)
    .header(SOME_HEADER, someHeaderValue())
    .body(BodyInserters.fromObject(obj)
    .exchange()
    .repeatWhen(f -> Flowables.intervalBackpressure(interval, TimeUnit.NANOSECONDS))
    .take(n)
    .blockLast()
    ;

, что не приводит к сбою, когда клиент не успевает, но .header(SOME_HEADER, someHeaderValue()) вызывается только один раз (в отличие отимперативная петля);someHeaderValue() должен обновляться при каждом повторении.

...