Потребитель Kafka НЕ является потокобезопасным. Весь сетевой ввод / вывод происходит в потоке приложения, выполняющего вызов. Пользователь несет ответственность за обеспечение правильной синхронизации многопоточного доступа. Несинхронизированный доступ приведет к ConcurrentModificationException.
Если потребителю назначено несколько разделов для извлечения данных, он будет пытаться использовать их все одновременно, эффективно предоставляя этим разделам одинаковый приоритет для потребления. Однако в некоторых случаях потребители могут сначала сосредоточиться на извлечении из некоторого подмножества назначенных разделов на полной скорости, и начинать выборку других разделов только тогда, когда эти разделы имеют мало данных или вообще не используют их.
Spring-Кафка
ConcurrentKafkaListenerContainerFactory
используется для создания контейнеров для аннотированных методов с @KafkaListener
Есть две MessageListenerContainer
весной кафки
KafkaMessageListenerContainer
ConcurrentMessageListenerContainer
KafkaMessageListenerContainer
получает все сообщения от всех тем или разделов в одной теме. ConcurrentMessageListenerContainer
делегирует один или несколько экземпляров KafkaMessageListenerContainer
для обеспечения многопоточного потребления.
Использование ConcurrentMessageListenerContainer
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
Имеет свойство параллелизма. Например, container.setConcurrency (3) создает три KafkaMessageListenerContainer
экземпляра.
Если у вас есть шесть TopicPartition
экземпляров, предоставляется и параллелизм равен 3; каждый контейнер получает два раздела. Для пяти экземпляров TopicPartition два контейнера получают два раздела, а третий получает один. Если параллелизм больше, чем число TopicPartitions, параллелизм уменьшается, так что каждый контейнер получает один раздел.
вот яркий пример с документацией здесь