Я пытаюсь написать потребитель kafka, используя библиотеку spring-kafka версии 2.3.0.M2.Для обработки ошибок во время выполнения я использую SeekToCurrentErrorHandler.class с DeadLetterPublishingRecoverer в качестве моего восстановителя.Это работает нормально только тогда, когда мой потребительский код выдает исключение, но не удается, если не удается десериализовать сообщение.
Я попытался реализовать ErrorHandler сам, и мне это удалось, но с этим подходом я сам заканчиваю тем, что писал код DLT для обработки сообщений об ошибкахчто я не хочу делать.
Ниже приведены мои свойства кафки
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
auto-offset-reset: latest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
properties:
spring.json.trusted.packages: com.mypackage
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), maxFailures));}