Причиной проблемы, скорее всего, является то, что операция doSomething()
занимает больше времени, чем указанный интервал потока, что означает, что через некоторое время doSomething
задания перекрывают друг друга и срабатывает противодавление. Поскольку Flux.interval
является горячей источник (это означает, что он не излучает сигналы по требованию), и flatMap
имеет ограничение параллелизма по умолчанию (256), оператор перегружен, и это приводит к OverflowException
.
Исходя из ваших требований, Есть несколько возможных решений этой проблемы:
1. Игнорируйте ошибку переполнения и отбросьте сигналы, которые могли бы переполниться
Это означает, что иногда мы пропускаем секунду и не планируем задание на интервал, если у нас уже есть много (256) в процессе.
Flux.interval(Duration.ofMillis(1000))
.onBackpressureDrop()
.flatMap(ignore -> doSomething())
2. Установите для flatMap
параллелизма более высокое значение
Через некоторое время это может привести к возникновению OverflowException, но это задержит проблему (возможно, не лучшее решение).
Flux.interval(Duration.ofMillis(1000))
.flatMap(ignore -> doSomething(), Integer.MAX_VALUE)
3. Не позволяйте заданиям накладываться друг на друга
Мы переключаемся с горячего источника на холодный, что исключает возможность переполнения. Однако мы теряем гарантию планирования события каждую секунду. Вместо этого они будут запланированы по требованию, когда предыдущее задание будет завершено и по крайней мере 1 секунда пройдена.
Mono.just(1).repeat() // infinite Flux with backpressure
.delayElements(Duration.ofMillis(1000))
.concatMap(ignore -> doSomething())
Вы также можете комбинировать это решение с предыдущим, если вы хорошо справляетесь с перекрывающимися заданиями и определяете разумный параллелизм уровень в flatMap
звонке.