Я использую spring-kafka
и использую пакетные записи из раздела kafka и отправляю смещение на AbstractMessageListenerContainer.AckMode.BATCH
.
В моем случае обработка пакетных записей занимает время (приблизительно 20 секунд), а поток потребителя ожидаетдо тех пор, пока не завершится пакетный процесс, а затем снова выполните опрос (в этом опросе отправляется смещение).В этом случае я назначу List
записей потоку (имя: ProcessThread
), который обработает все записи и выдаст результат обратно в поток потребителя, а затем поток потребителя запишет результат.(во всем этом потоке потребителя процесс будет ждать, пока не получит результаты от ProcessThread
, что приводит к низкой производительности.
Есть ли способ, которым ProcessThread
позаботится о передаче смещения в kafka?потребительскому потоку не нужно будет ждать, и для каждого опроса он будет создавать новый processThread
В моем случае у меня есть тема с 20 разделами и 10 модулями, в каждом модуле с двумя потоками потребителей (Spring Kafka одновременных потребителей), каждый опрос 100 записей (обработка всех этих записей с использованием весенней загрузки потоков @Async
)
При описанной выше конфигурации я могу обработать 1 миллион записей за 2 часа, и мне нужно перетащить его как минимум до 40 минут.
Любая помощь приветствуется 101