Потребительский исход Kafka уменьшается как входящий в topi c увеличивается - PullRequest
0 голосов
/ 27 апреля 2020

Я вижу проблему в моем Kafka Consumer, где я вижу, что пропускная способность Consumer зависит от высокой нагрузки от производителя. Я вижу, что пропускная способность возрастает до 600 КБ / мин при низкой загрузке источника, но пропускная способность снижается примерно до 350 КБ при высокой нагрузке источника, что вызывает большие задержки в Kafka topi c. Это несоответствие и могу ли я что-то сделать в реализации для потребителей? Ниже приведена конфигурация Consumer

. Я реализовал Kafka Consumer с использованием Spring-Kafka

key.deserializer : StringDeserializer
value.deserializer : [CUSTOM DESERIALIZER]
enable.auto.commit  : false
max.poll.records : 5
group.id : [MY GROUP]
partition.assignment.strategy : StickyAssignor
max.partition.fetch.bytes : 1048576
bootstrap.servers : [SERVERS]
auto.commit.interval.ms : 3000
auto.offset.reset : latest


factory.setConcurrency(3);


@KafkaListener(topics = "#{kafkaTopicConfig.getStoreSupply()}", containerFactory = EI_LISTNER_FACTORY)

//EI_LISTNER_FACTORY is a Bean..

@Bean(EI_LISTNER_FACTORY)
public ConcurrentKafkaListenerContainerFactory<String, AggQuantityByPrimeValue> eiKafkaListenerContainerFactory() {

    Boolean eiCnsumerStartup = [START_UP From Configuration]

    Integer concurrentThreadCount = 3;

    Map<String, Object> config = [properties from ABOVE]
    ConcurrentKafkaListenerContainerFactory<String, AggQuantityByPrimeValue> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
    factory.setAutoStartup(eiConsumerStartup);

    if (config.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).equals("false")) {
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConcurrency(concurrentThreadCount);
    }
    return factory;
}
.
...