Как отправить смещение кафки после обработки записей партии - PullRequest
0 голосов
/ 03 октября 2018

Я использую spring-kafka и использую пакетные записи из раздела kafka и отправляю смещение на AbstractMessageListenerContainer.AckMode.BATCH.

В моем случае обработка пакетных записей занимает время (приблизительно 20 секунд), а поток потребителя ожидаетдо тех пор, пока не завершится пакетный процесс, а затем снова выполните опрос (в этом опросе отправляется смещение).В этом случае я назначу List записей потоку (имя: ProcessThread), который обработает все записи и выдаст результат обратно в поток потребителя, а затем поток потребителя запишет результат.(во всем этом потоке потребителя процесс будет ждать, пока не получит результаты от ProcessThread, что приводит к низкой производительности.

Есть ли способ, которым ProcessThread позаботится о передаче смещения в kafka?потребительскому потоку не нужно будет ждать, и для каждого опроса он будет создавать новый processThread

В моем случае у меня есть тема с 20 разделами и 10 модулями, в каждом модуле с двумя потоками потребителей (Spring Kafka одновременных потребителей), каждый опрос 100 записей (обработка всех этих записей с использованием весенней загрузки потоков @Async)

При описанной выше конфигурации я могу обработать 1 миллион записей за 2 часа, и мне нужно перетащить его как минимум до 40 минут.

Любая помощь приветствуется 101

1 Ответ

0 голосов
/ 03 октября 2018

Вы должны обновить по крайней мере до 1.3.7.Текущая версия 2.1.10.В них нет отдельной темы.Только потребительский поток может отправлять смещения.Потребитель не является потокобезопасным.

Не следует переходить на отдельный поток.Используйте более высокий параллелизм и больше разделов.

...