Использование Flux.interval вместо @Scheduled при весенней загрузке - PullRequest
0 голосов
/ 05 февраля 2020

Я создаю приложение весенней загрузки с использованием Spring Webflux и хочу сделать приложение полностью неблокирующим. Само приложение имеет несколько конечных точек REST и пакетное задание, которое должно запускаться каждые несколько секунд. Для пакетного задания я пытаюсь Flux.interval(Duration.ofMillis(1000)) создать длинные значения, которые игнорирую, и запустить запланированное задание.

Flux.interval(Duration.ofMillis(1000))
    .flatMap(ignore -> doSomething())
    .subscribe();

Однако через некоторое время я получаю сообщение об ошибке

reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit tick 257 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)

Может кто-нибудь сказать мне, как преодолеть эту проблему?

1 Ответ

1 голос
/ 06 февраля 2020

Причиной проблемы, скорее всего, является то, что операция doSomething() занимает больше времени, чем указанный интервал потока, что означает, что через некоторое время doSomething задания перекрывают друг друга и срабатывает противодавление. Поскольку Flux.interval является горячей источник (это означает, что он не излучает сигналы по требованию), и flatMap имеет ограничение параллелизма по умолчанию (256), оператор перегружен, и это приводит к OverflowException.

Исходя из ваших требований, Есть несколько возможных решений этой проблемы:

1. Игнорируйте ошибку переполнения и отбросьте сигналы, которые могли бы переполниться

Это означает, что иногда мы пропускаем секунду и не планируем задание на интервал, если у нас уже есть много (256) в процессе.

Flux.interval(Duration.ofMillis(1000))
    .onBackpressureDrop()
    .flatMap(ignore -> doSomething())

2. Установите для flatMap параллелизма более высокое значение

Через некоторое время это может привести к возникновению OverflowException, но это задержит проблему (возможно, не лучшее решение).

Flux.interval(Duration.ofMillis(1000))
    .flatMap(ignore -> doSomething(), Integer.MAX_VALUE)

3. Не позволяйте заданиям накладываться друг на друга

Мы переключаемся с горячего источника на холодный, что исключает возможность переполнения. Однако мы теряем гарантию планирования события каждую секунду. Вместо этого они будут запланированы по требованию, когда предыдущее задание будет завершено и по крайней мере 1 секунда пройдена.

Mono.just(1).repeat() // infinite Flux with backpressure
    .delayElements(Duration.ofMillis(1000))
    .concatMap(ignore -> doSomething())

Вы также можете комбинировать это решение с предыдущим, если вы хорошо справляетесь с перекрывающимися заданиями и определяете разумный параллелизм уровень в flatMap звонке.

...