Я создаю реакторный приемник для пакетной обработки ряда записей в соответствии с фиксированным количеством записей или продолжительностью. Я придумал эту простую топологию, но мне интересно:
- Где должна происходить буферизация? Это может произойти до или после публикации
- Нужно ли вообще публиковать здесь новый планировщик? Если мой метод
doOnEach()
блокируется при включенной буферизации, будет ли заблокирован потребитель до того, как буфер заполнится?
- Требуется ли
onBackpressureBuffer()
, если я использую bufferTimeout()
return KafkaReceiver
.create(options)
.receive()
.onBackpressureBuffer()
.bufferTimeout(batchSize, batchTimeout)
.publishOn(Schedulers.newSingle("some name"))
.doOnEach(x -> someBlockingWork(x)))
.doOnCancel(scheduler::dispose)
.doOnError( x -> log.error("Some error occurred: {}", x) )
.doOnComplete (() -> log.info("Shutting down Kafka consumer") );