ErrorHandlingDeserializer
Когда десериализатору не удается десериализовать сообщение, Spring не может обработать проблему, потому что это происходит до того, как возвращается poll (). Чтобы решить эту проблему, версия 2.2 представила ErrorHandlingDeserializer. Этот десериализатор делегирует реальный десериализатор (ключ или значение). Если делегату не удается десериализовать содержимое записи, ErrorHandlingDeserializer возвращает вместо этого исключение DeserializationException, содержащее причину и необработанные байты. При использовании MessageListener уровня записи, если ключ или значение содержит исключение DeserializationException, контейнер ErrorHandler вызывается с ошибочным ConsumerRecord. При использовании BatchMessageListener сбойная запись передается приложению вместе с оставшиеся записи в пакете, поэтому слушатель приложения должен проверить, является ли ключ или значение в конкретной записи исключением DeserializationException.
Итак, в соответствии с вашим кодом вы используете record-level MessageListener
, затем просто добавьте ErrorHandler
к Container
Обработка исключений
Если ваш обработчик ошибок реализует этот интерфейс, вы можете, например, соответствующим образом настроить смещения. Например, чтобы сбросить смещение и воспроизвести сообщение об ошибке, вы можете сделать что-то вроде следующего: заметьте, однако, что это упрощенные реализации, и вам, вероятно, понадобится дополнительная проверка в обработчике ошибок.
@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
return (m, e, c) -> {
this.listen3Exception = e;
MessageHeaders headers = m.getHeaders();
c.seek(new org.apache.kafka.common.TopicPartition(
headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
headers.get(KafkaHeaders.OFFSET, Long.class));
return null;
};
}
Или вы можете сделать пользовательскую реализацию, как в этом примере
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setErrorHandler(new ErrorHandler() {
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
TopicPartition topicPartition = new TopicPartition(topics, partition);
//log.info("Skipping " + topic + "-" + partition + " offset " + offset);
consumer.seek(topicPartition, offset + 1);
System.out.println("OKKKKK");
}
@Override
public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {
}
@Override
public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord, Consumer<?,?> consumer) {
String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
TopicPartition topicPartition = new TopicPartition(topics, partition);
//log.info("Skipping " + topic + "-" + partition + " offset " + offset);
consumer.seek(topicPartition, offset + 1);
System.out.println("OKKKKK");
}
});
return factory;
}