с Kafka Manual время от времени подтверждают получение «Исключение Kafka Listner: фиксация не может быть выполнена» - PullRequest
0 голосов
/ 24 октября 2019

У нас есть Kafka Consumer (параллелизм 5) с ручным подтверждением. В приведенной ниже реализации иногда получение исключения Подтверждение не может быть завершено, поскольку группа уже перебалансирована ...

В сценарии исключения сообщение не подтверждается, и оно используется один раз. очередной раз.

Любые предложения по изменениям конфигурации, не влияющие на производительность потребителя ???

Consumer Factory

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

/*
 * Reading of the variables from yml file
 */

@Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // SASL and JAAS properties
        if(null!=kafkaTrustStoreFileLoc && !kafkaTrustStoreFileLoc.isEmpty() && isNotNullSslParams()) {
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
            props.put(SaslConfigs.SASL_MECHANISM, kafkaSaslMechanism);
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaTrustStoreFileLoc);
            props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, kafkaSslIdentifyAlg); 

            String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; 
            String jaasCfg = String.format(jaasTemplate, kafkaUsername, kafkaPassword);
            props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasCfg);
        }

        return new DefaultKafkaConsumerFactory<>(props);
    }

    protected boolean isNotNullSslParams() {
        return null!=kafkaSecurityProtocol 
                && null!= kafkaSaslMechanism
                && null!= kafkaSslIdentifyAlg
                && null!= kafkaUsername
                && null!= kafkaPassword;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConcurrency(5);
        return factory;
    }
}

Потребитель

@KafkaListener(topics = {"${kafka.topic}" }, containerFactory = "kafkaListenerContainerFactory")
    public void listen(@Payload final String message,
            @Header(KafkaHeaders.RECEIVED_TOPIC) final String topic, Acknowledgment ack) {
        try {
            log.debug("Received '{}'-message {} from Kafka", topic, message);
                messageReceived(topic, message); //processing message       
                ack.acknowledge(); //ack the message
        } catch (Exception e) {
            log.error("Kafka Listener Exception : {} -> {}", e.getMessage(), e);
        }
    }

Ответы [ 2 ]

0 голосов
/ 24 октября 2019

Вы можете попробовать ниже параметры, объясненные здесь

session.timeout.ms (по умолчанию: 6 секунд) Во время каждого опроса координатор потребителя посылает пульс брокеру, чтобы убедиться, чтоэта потребительская сессия жива и активна. Если брокер не получил пульс до брокера session.timeout.ms, то брокер покидает этого потребителя и выполняет ребалансирование

Примечание. Если вы увеличиваете session.timeout.ms, пожалуйста, посмотрите, требуется ли настроить группу брокераНастройка .max.session.timeout.ms.

max.poll.interval.ms : (по умолчанию: 5 минут) Максимальная задержка между вызовами poll () при использованииуправление группами потребителей. Это означает, что максимальное время ожидания потребителя будет бездействующим до получения большего количества записей. Если функция poll () не вызывается до истечения этого тайм-аута, то считается, что потребитель потерпел неудачу, и группа перебалансировала

max.poll. records : (по умолчанию: 500) Максимальное количество записей, возвращаемых за один вызов poll (). Вы можете попытаться сократить время обработки меньшего количества записей за один раз

Если вы все еще сталкиваетесь с проблемой вместе с указанным выше свойством, помимо «подписаться» попробуйте использовать раздел «назначить» в своем клиенте.

НижеЕсть несколько соображений перед установкой значения:

  1. group.max.session.timeout.ms> session.timeout.ms> group.min.session.timeout.ms
  2. request.timeout.ms> session.timeout.ms
  3. heartbeat.interval.ms ~ session.timeout.ms) / 3 (приблизительно)
0 голосов
/ 24 октября 2019

У вас слишком много времени на обработку всех записей, полученных за последний poll().

Обработка всех записей каждого опроса должна быть завершена в max.poll.interval.ms (ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG) - по умолчанию5 минут.

Выясните, сколько времени занимает обработка каждой записи, и либо увеличьте max.poll.interval.ms, либо уменьшите max.poll.records.

...