У нас есть Kafka Consumer (параллелизм 5) с ручным подтверждением. В приведенной ниже реализации иногда получение исключения Подтверждение не может быть завершено, поскольку группа уже перебалансирована ...
В сценарии исключения сообщение не подтверждается, и оно используется один раз. очередной раз.
Любые предложения по изменениям конфигурации, не влияющие на производительность потребителя ???
Consumer Factory
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
/*
* Reading of the variables from yml file
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// SASL and JAAS properties
if(null!=kafkaTrustStoreFileLoc && !kafkaTrustStoreFileLoc.isEmpty() && isNotNullSslParams()) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
props.put(SaslConfigs.SASL_MECHANISM, kafkaSaslMechanism);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaTrustStoreFileLoc);
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, kafkaSslIdentifyAlg);
String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
String jaasCfg = String.format(jaasTemplate, kafkaUsername, kafkaPassword);
props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasCfg);
}
return new DefaultKafkaConsumerFactory<>(props);
}
protected boolean isNotNullSslParams() {
return null!=kafkaSecurityProtocol
&& null!= kafkaSaslMechanism
&& null!= kafkaSslIdentifyAlg
&& null!= kafkaUsername
&& null!= kafkaPassword;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(5);
return factory;
}
}
Потребитель
@KafkaListener(topics = {"${kafka.topic}" }, containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload final String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) final String topic, Acknowledgment ack) {
try {
log.debug("Received '{}'-message {} from Kafka", topic, message);
messageReceived(topic, message); //processing message
ack.acknowledge(); //ack the message
} catch (Exception e) {
log.error("Kafka Listener Exception : {} -> {}", e.getMessage(), e);
}
}