Реактор-кафка стратегии пакетной подачи - PullRequest
0 голосов
/ 10 апреля 2019

Я создаю реакторный приемник для пакетной обработки ряда записей в соответствии с фиксированным количеством записей или продолжительностью. Я придумал эту простую топологию, но мне интересно:

  • Где должна происходить буферизация? Это может произойти до или после публикации
  • Нужно ли вообще публиковать здесь новый планировщик? Если мой метод doOnEach() блокируется при включенной буферизации, будет ли заблокирован потребитель до того, как буфер заполнится?
  • Требуется ли onBackpressureBuffer(), если я использую bufferTimeout()
return KafkaReceiver
        .create(options)
        .receive()
        .onBackpressureBuffer()
        .bufferTimeout(batchSize, batchTimeout)
        .publishOn(Schedulers.newSingle("some name"))
        .doOnEach(x -> someBlockingWork(x)))
        .doOnCancel(scheduler::dispose)
        .doOnError( x -> log.error("Some error occurred: {}", x) )
        .doOnComplete (() -> log.info("Shutting down Kafka consumer") );
...