У меня есть логика: я получаю сообщения из одной темы (1 раздел), делаю медленный расчет (может занять 100 мс-2 с) и отправляю результат в другую тему.Я хочу использовать библиотеку Reactor kafka и применить противодавление с помощью метода limitRate, однако, когда я хочу выполнить вычисление в нескольких потоках, этот limitRate не работает (запросы «по запросу» отправляются без ожидания завершения расчета).Я думал, что, когда работа завершена в одном из потоков, посылается сигнал, и подписчик может отправить запрос на следующие сообщения.
У кого-нибудь уже была такая проблема?
public void runFlow() {
KafkaSender<String, String> kafkaSender = createKafkaSender();
KafkaReceiver<String, String> kafkaReceiver = createKafkaReceiver();
Scheduler scheduler = Schedulers.newParallel("my-scheduler", 4);
kafkaReceiver
.receiveAtmostOnce()
.log()
.limitRate(2)
.parallel()
.runOn(scheduler)
.map(m -> SenderRecord.create(workerTask.processMessage(m), m.offset()))
.as(kafkaSender::send)
.subscribe();
}
Я хочу создать ситуацию, подобную той, в которой я находился, когда:
kafkaReceiver
.receiveAtmostOnce()
.log()
.limitRate(2)
//.parallel()
//.runOn(scheduler)
.map(m -> SenderRecord.create(workerTask.processMessage(m), m.offset()))
.as(kafkaSender::send)
.subscribe();
тогда я вижу, что запрос на новые сообщения отправляется после завершения предыдущего вычисления.