Как обрабатывать SerializationException после десериализации - PullRequest
0 голосов
/ 02 июля 2018

Я использую реестр Avro и Schema с моей установкой Spring Kafka.

Я хотел бы как-то обработать SerializationException, который может быть брошен во время десериализации.

Я нашел следующие два ресурса:

https://github.com/spring-projects/spring-kafka/issues/164

Как настроить spring-kafka на игнорирование сообщений в неправильном формате?

Эти ресурсы предполагают, что я возвращаю ноль вместо SerializationException при десериализации и прислушиваюсь к KafkaNull. Это решение отлично работает.

Однако я хотел бы иметь возможность генерировать исключение вместо возврата null.

KIP-161 и KIP-210 обеспечивают улучшенные функции обработки исключений. Я нашел некоторые ресурсы, упоминающие KIP-161 в Spring Cloud, но ничего особенного в Spring-Kafka.

Кто-нибудь знает, как поймать SerializationException в Spring Boot?

Я использую Spring Boot 2.0.2

Редактировать: я нашел решение.

Я бы скорее бросил исключение и поймал его, чем возвращал ноль или KafkaNull. Я использую свой собственный сериализатор Avro и десериализатор в нескольких различных проектах, некоторые из которых не Spring. Если бы я изменил свой сериализатор Avro и десериализатор, то нужно было бы изменить некоторые другие проекты, чтобы ожидать, что десериализатор вернет ноль.

Я бы хотел закрыть контейнер, чтобы я не потерял ни одного сообщения. SerializationException никогда не следует ожидать в производстве. Исключение SerializationException должно иметь место только в том случае, если не работает реестр схем или если неформатированное сообщение каким-либо образом отправлено в рабочую кафку. В любом случае, исключение SerializationException должно происходить очень редко, и если это произойдет, то я хочу закрыть контейнер так, чтобы сообщения не терялись, и я мог бы исследовать проблему.

Только учтите, что выловятся все исключения из вашего потребительского контейнера. В моем конкретном случае я просто хочу отключить, только если это SerializationException

public class SerializationExceptionHandler extends ContainerStoppingErrorHandler {

    @Override
    public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
                       MessageListenerContainer container) {

        //Only call super if the exception is SerializationException
        if (thrownException instanceof SerializationException) {
            //This will shutdown the container.
            super.handle(thrownException, records, consumer, container);
        } else {
            //Wrap and re-throw the exception
            throw new KafkaException("Kafka Consumer Container Error", thrownException);
        }
    }
}

Этот обработчик передается в потребительский контейнер. Ниже приведен пример KafkaListenerContainerFactory боб.

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {

    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(1);
    factory.getContainerProperties().setPollTimeout(3000);

    factory.getContainerProperties().setErrorHandler(new SerializationExceptionHandler());

    factory.getContainerProperties().setTransactionManager(chainedTxM(jpa, kafka));
    return factory;
}

1 Ответ

0 голосов
/ 02 июля 2018

Весна ничего не может сделать; десериализация происходит до того, как потребитель получит какие-либо данные. Вам нужно улучшить десериализатор.

Однако я хотел бы иметь возможность выдавать исключение вместо возврата null.

Это ничего не поможет, так как Кафка не знает, как справиться с исключением. Снова; все это происходит до того, как данные становятся доступны, поэтому наилучшим методом является возврат значения null (или какого-либо другого специального значения).

EDIT

В 2.2 мы добавили десериализатор обработки ошибок , который делегирует действительному десериализатору и возвращает ноль, за исключением исключения в заголовке; контейнер слушателя затем передает это непосредственно обработчику ошибок вместо слушателя.

...