Я использую приложение весенней загрузки, где потребитель kafka потребляет сообщения и сохраняет их в Hbase.
в случае каких-либо исключений при чтении сообщения из kafka topi c, как ограничить механизм повторных попыток в случай пакетного списка. я использовал приведенную ниже конфигурацию kafka, но она повторяется бесконечно долго.
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, gropuId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordSize);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return props;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(1);
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setSyncCommits(true);
factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
factory.setBatchListener(true);
return factory;
}