Реактор кафка + параллелизм + обратное давление - PullRequest
0 голосов
/ 17 июня 2019

У меня есть логика: я получаю сообщения из одной темы (1 раздел), делаю медленный расчет (может занять 100 мс-2 с) и отправляю результат в другую тему.Я хочу использовать библиотеку Reactor kafka и применить противодавление с помощью метода limitRate, однако, когда я хочу выполнить вычисление в нескольких потоках, этот limitRate не работает (запросы «по запросу» отправляются без ожидания завершения расчета).Я думал, что, когда работа завершена в одном из потоков, посылается сигнал, и подписчик может отправить запрос на следующие сообщения.

У кого-нибудь уже была такая проблема?

public void runFlow() {
    KafkaSender<String, String> kafkaSender = createKafkaSender();
    KafkaReceiver<String, String> kafkaReceiver = createKafkaReceiver();
    Scheduler scheduler = Schedulers.newParallel("my-scheduler", 4);

    kafkaReceiver
        .receiveAtmostOnce()
        .log()
        .limitRate(2)
        .parallel()
        .runOn(scheduler)
        .map(m -> SenderRecord.create(workerTask.processMessage(m), m.offset()))
        .as(kafkaSender::send)
        .subscribe();
}

Я хочу создать ситуацию, подобную той, в которой я находился, когда:

kafkaReceiver
    .receiveAtmostOnce()
    .log()
    .limitRate(2)
    //.parallel()
    //.runOn(scheduler)
    .map(m -> SenderRecord.create(workerTask.processMessage(m), m.offset()))
    .as(kafkaSender::send)
    .subscribe();

тогда я вижу, что запрос на новые сообщения отправляется после завершения предыдущего вычисления.

...