Есть ли способ максимизировать ограничение Retry для слушателя Кафки в случае исключения - PullRequest
2 голосов
/ 08 января 2020

Я использую приложение весенней загрузки, где потребитель kafka потребляет сообщения и сохраняет их в Hbase.

в случае каких-либо исключений при чтении сообщения из kafka topi c, как ограничить механизм повторных попыток в случай пакетного списка. я использовал приведенную ниже конфигурацию kafka, но она повторяется бесконечно долго.

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, gropuId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordSize);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return props;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(1);
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setSyncCommits(true);
        factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
        factory.setBatchListener(true);
        return factory;
}
...