У меня есть топи c с 300 разделами и 100 потребителями / машинами. Я использую Spring Kafka в качестве своей базовой структуры для реализации Потребителей Kafka.
Я использую ConcurrentKafkaListenerContainerFactory и устанавливаю параллелизм на 3, поэтому в теории у меня должно быть 300 Consumer Containers, и один раздел должен быть подключен к один контейнер, таким образом, разделы равномерно распределяются между 100 машинами.
Для первого конструктора kafka будет распределять разделы по потребителям. Для второго конструктора ConcurrentMessageListenerContainer распределяет разделы TopicPartition по делегату KafkaMessageListenerContainer.
Если, скажем, предоставлено 6 разделов TopicPartition, а параллелизм равен 3; каждый контейнер получит 2 раздела. Для 5 TopicPartition s, 2 контейнера получат 2 раздела, а третий получит 1. Если параллелизм больше, чем количество TopicPartitions, параллелизм будет уменьшен таким образом, что каждый контейнер получит один раздел.
Но я не вижу описанного выше поведения, я вижу, что некоторые из Контейнеров / Машин простаивают, в то время как другие подключены к 6 разделам, что вызывает отставание в Кафке Топи c.
Я делаю что-то не так, как я могу убедиться, что разделы равномерно распределены между контейнерами, и ни один контейнер не сопоставлен более чем с одним разделом? Пожалуйста, помогите.
key.deserializer : StringDeserializer
value.deserializer : [CUSTOM DESERIALIZER]
enable.auto.commit : false
max.poll.records : 5
group.id : [MY GROUP]
partition.assignment.strategy : StickyAssignor
max.partition.fetch.bytes : 1048576
bootstrap.servers : [SERVERS]
auto.commit.interval.ms : 3000
auto.offset.reset : latest
factory.setConcurrency(3);
@KafkaListener(topics = "#{kafkaTopicConfig.getStoreSupply()}", containerFactory = EI_LISTNER_FACTORY)
EI_LISTNER_FACTORY - Боб ..
@Bean(EI_LISTNER_FACTORY)
public ConcurrentKafkaListenerContainerFactory<String, AggQuantityByPrimeValue> eiKafkaListenerContainerFactory() {
Boolean eiCnsumerStartup = [START_UP From Configuration]
Integer concurrentThreadCount = 3;
Map<String, Object> config = [properties from ABOVE]
ConcurrentKafkaListenerContainerFactory<String, AggQuantityByPrimeValue> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
factory.setAutoStartup(eiConsumerStartup);
if (config.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).equals("false")) {
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(concurrentThreadCount);
}
return factory;
}