Ошибка при запуске весеннего загрузочного проекта Kafka - PullRequest
0 голосов
/ 03 апреля 2019

Ошибка при запуске проекта весенней загрузки kafka.

пружинный ботинок: 2.1.2. ВЫПУСК Версия Spring kafka: 2.2.5. ВЫПУСК

Потребитель не может быть настроен для автоматической фиксации для ackMode MANUAL_IMMEDIATE

Конфигурация потребителя

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<String, Object>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return props;
}

Кафка Слушатель контейнерного завода

@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    factory.setRetryTemplate(retryTemplate);
    factory.setRecoveryCallback(context -> {
        log.error("Maximum retry policy has been reached {}", context.getAttribute("record"));
        Acknowledgment ack = (Acknowledgment) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT);
        ack.acknowledge();
        return null;
    });
    factory.setConcurrency(Integer.parseInt(kafkaConcurrency));
    return factory;
}

1 Ответ

0 голосов
/ 03 апреля 2019

Для режима MANUAL_IMMEDIATE ack (для любого ручного режима, в основном) свойство потребителя ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG должно быть отключено.

Вот причина этого исключения.

Полагаю, вы не можете просто использовать spring.kafka.consumer.enableAutoCommit в своем application.properties.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...