Как пропустить испорченные (не сериализуемые) сообщения в Spring Kafka Consumer? - PullRequest
0 голосов
/ 16 ноября 2018

Этот вопрос относится к Spring Kafka, связанному с Apache Kafka с потребителем высокого уровня: пропуск поврежденных сообщений

Есть ли способ настроить потребителя Spring Kafka на пропуск записи, которая не может быть прочитана / обработана (повреждена)?

Я наблюдаю ситуацию, когда потребитель застревает на одной и той же записи, если ее невозможно десериализовать. Это ошибка, которую выбрасывает потребитель.

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDate: no long/Long-argument constructor/factory method to deserialize from Number value 

Потребитель опрашивает тему и продолжает печатать одну и ту же ошибку в цикле, пока программа не будет убита.

В @KafkaListener, который имеет следующие конфигурации фабрики Consumer,

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

1 Ответ

0 голосов
/ 16 ноября 2018

Вам нужно ErrorHandlingDeserializer: https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#error-handling-deserializer

Если вы не можете перейти к этой 2.2 версии, рассмотрите возможность реализовать свою собственную и вернуть null для тех записей, которые не могут быть десериализованыправильно.

Исходный код здесь: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer2.java

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...