Я вижу проблему в моем Kafka Consumer, где я вижу, что пропускная способность Consumer зависит от высокой нагрузки от производителя. Я вижу, что пропускная способность возрастает до 600 КБ / мин при низкой загрузке источника, но пропускная способность снижается примерно до 350 КБ при высокой нагрузке источника, что вызывает большие задержки в Kafka topi c. Это несоответствие и могу ли я что-то сделать в реализации для потребителей? Ниже приведена конфигурация Consumer
. Я реализовал Kafka Consumer с использованием Spring-Kafka
key.deserializer : StringDeserializer
value.deserializer : [CUSTOM DESERIALIZER]
enable.auto.commit : false
max.poll.records : 5
group.id : [MY GROUP]
partition.assignment.strategy : StickyAssignor
max.partition.fetch.bytes : 1048576
bootstrap.servers : [SERVERS]
auto.commit.interval.ms : 3000
auto.offset.reset : latest
factory.setConcurrency(3);
@KafkaListener(topics = "#{kafkaTopicConfig.getStoreSupply()}", containerFactory = EI_LISTNER_FACTORY)
//EI_LISTNER_FACTORY is a Bean..
@Bean(EI_LISTNER_FACTORY)
public ConcurrentKafkaListenerContainerFactory<String, AggQuantityByPrimeValue> eiKafkaListenerContainerFactory() {
Boolean eiCnsumerStartup = [START_UP From Configuration]
Integer concurrentThreadCount = 3;
Map<String, Object> config = [properties from ABOVE]
ConcurrentKafkaListenerContainerFactory<String, AggQuantityByPrimeValue> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
factory.setAutoStartup(eiConsumerStartup);
if (config.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).equals("false")) {
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(concurrentThreadCount);
}
return factory;
}
.