Я настроил SeekToErrorHandler с DeadLetterPublisheingRecoverer
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(primaryConsumerFactory());
factory.setConcurrency(this.kafkaConfigProperties.getConsumerConcurrency());
factory.setAutoStartup(true);
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setErrorHandler(new SeekToErrorHandler(new DeadLetterPublisheingRecoverer(kafkaTemplate()),3));
Когда из слушателя (или средства проверки) выдается исключение, после трех попыток сообщение публикуется в мертвой букве.
Проблема здесь в следующий раз, когда перезапускаю мое весеннее загрузочное приложение (или контейнер слушателя), то же сообщение снова доставляется слушателю, проходит всю последовательность и, наконец, попадает в мертвую букву. Есть ли способ избежать этого?
Я отключил автоматическую фиксацию и установил AckOnError (false) и AckMode (AckMode.RECORD);
В SeekToErrorHandler я обнаружил, что логика c вокруг SeekToUtil выдает исключение до тех пор, пока не будет выполнено настроенное число итераций и, наконец, не вызовет метод accept BiConsumer (публикация deadletter). Таким образом, контейнер должен зафиксировать запись на последнем этапе (при публикации в мертвую букву), верно? Я также просмотрел комментарий к методу ackOnError (boolean) в org.springframework.kafka.listener.ContainerProperties
Когда setAckOnError (true), я мог найти правильное поведение с тремя повторными попытками и, наконец, вызвать издателя мертвых писем. Сообщение не возвращается повторно при перезапуске контейнера слушателя
Версия Spring kafka - 2.2.6