Как отловить ошибку десериализации в Kafka-Spring? - PullRequest
0 голосов
/ 30 апреля 2019

Я получаю приложение, потребляющее сообщения кафки.

Я следовал Spring-docs об обработке ошибок десериализации, чтобы поймать исключение десериализации. Я пробовал метод failDeserializationFunction.

Это мой класс конфигурации потребителя

@Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);

        /*  Error Handling */
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
        consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedNTCMessageBodyProvider.class);

        return consumerProps;
    }

    @Bean
    public ConsumerFactory<String, NTCMessageBody> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new JsonDeserializer<>(NTCMessageBody.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

Это поставщик бифункций

public class FailedNTCMessageBodyProvider implements BiFunction<byte[], Headers, NTCMessageBody> {

    @Override
    public NTCMessageBody apply(byte[] t, Headers u) {
        return new NTCBadMessageBody(t);
    }

}

public class NTCBadMessageBody extends NTCMessageBody{

    private final byte[] failedDecode;

    public NTCBadMessageBody(byte[] failedDecode) {
        this.failedDecode = failedDecode;
    }

    public byte[] getFailedDecode() {
        return this.failedDecode;
    }

}

Когда я отправляю только одно испорченное сообщение по теме, я получаю эту ошибку (в цикле):

org.apache.kafka.common.errors.SerializationException: Ошибка десериализации ключ / значение

Я понял, что ErrorHandlingDeserializer2 должен делегировать тип NTCBadMessageBody и продолжить потребление. Я также видел (в режиме отладки), что он никогда не шел в конструкторе класса NTCBadMessageBody.

Кто-нибудь может мне помочь?

Большое спасибо.

1 Ответ

1 голос
/ 30 апреля 2019

ErrorHandlingDeserializer

Когда десериализатору не удается десериализовать сообщение, Spring не может обработать проблему, потому что это происходит до того, как возвращается poll (). Чтобы решить эту проблему, версия 2.2 представила ErrorHandlingDeserializer. Этот десериализатор делегирует реальный десериализатор (ключ или значение). Если делегату не удается десериализовать содержимое записи, ErrorHandlingDeserializer возвращает вместо этого исключение DeserializationException, содержащее причину и необработанные байты. При использовании MessageListener уровня записи, если ключ или значение содержит исключение DeserializationException, контейнер ErrorHandler вызывается с ошибочным ConsumerRecord. При использовании BatchMessageListener сбойная запись передается приложению вместе с оставшиеся записи в пакете, поэтому слушатель приложения должен проверить, является ли ключ или значение в конкретной записи исключением DeserializationException.

Итак, в соответствии с вашим кодом вы используете record-level MessageListener, затем просто добавьте ErrorHandler к Container

Обработка исключений

Если ваш обработчик ошибок реализует этот интерфейс, вы можете, например, соответствующим образом настроить смещения. Например, чтобы сбросить смещение и воспроизвести сообщение об ошибке, вы можете сделать что-то вроде следующего: заметьте, однако, что это упрощенные реализации, и вам, вероятно, понадобится дополнительная проверка в обработчике ошибок.

@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
return (m, e, c) -> {
    this.listen3Exception = e;
    MessageHeaders headers = m.getHeaders();
    c.seek(new org.apache.kafka.common.TopicPartition(
            headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
            headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
            headers.get(KafkaHeaders.OFFSET, Long.class));
    return null;
   };
}

Или вы можете сделать пользовательскую реализацию, как в этом примере

@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord>
kafkaListenerContainerFactory()  {

    ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setErrorHandler(new ErrorHandler() {
        @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
            String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
            String topics = s.split("-")[0];
            int offset = Integer.valueOf(s.split("offset ")[1]);
            int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);

            TopicPartition topicPartition = new TopicPartition(topics, partition);
            //log.info("Skipping " + topic + "-" + partition + " offset " + offset);
            consumer.seek(topicPartition, offset + 1);
            System.out.println("OKKKKK");
        }

        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {

        }

        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord, Consumer<?,?> consumer) {
            String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
            String topics = s.split("-")[0];
            int offset = Integer.valueOf(s.split("offset ")[1]);
            int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);

            TopicPartition topicPartition = new TopicPartition(topics, partition);
            //log.info("Skipping " + topic + "-" + partition + " offset " + offset);
            consumer.seek(topicPartition, offset + 1);
            System.out.println("OKKKKK");


        }
    });


    return factory;
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...