Ошибка при запуске проекта весенней загрузки kafka.
пружинный ботинок: 2.1.2. ВЫПУСК
Версия Spring kafka: 2.2.5. ВЫПУСК
Потребитель не может быть настроен для автоматической фиксации для ackMode MANUAL_IMMEDIATE
Конфигурация потребителя
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
Кафка Слушатель контейнерного завода
@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setRetryTemplate(retryTemplate);
factory.setRecoveryCallback(context -> {
log.error("Maximum retry policy has been reached {}", context.getAttribute("record"));
Acknowledgment ack = (Acknowledgment) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT);
ack.acknowledge();
return null;
});
factory.setConcurrency(Integer.parseInt(kafkaConcurrency));
return factory;
}