Я использую реестр Avro и Schema с моей установкой Spring Kafka.
Я хотел бы как-то обработать SerializationException
, который может быть брошен во время десериализации.
Я нашел следующие два ресурса:
https://github.com/spring-projects/spring-kafka/issues/164
Как настроить spring-kafka на игнорирование сообщений в неправильном формате?
Эти ресурсы предполагают, что я возвращаю ноль вместо SerializationException
при десериализации и прислушиваюсь к KafkaNull
. Это решение отлично работает.
Однако я хотел бы иметь возможность генерировать исключение вместо возврата null.
KIP-161 и KIP-210 обеспечивают улучшенные функции обработки исключений. Я нашел некоторые ресурсы, упоминающие KIP-161 в Spring Cloud, но ничего особенного в Spring-Kafka.
Кто-нибудь знает, как поймать SerializationException
в Spring Boot?
Я использую Spring Boot 2.0.2
Редактировать: я нашел решение.
Я бы скорее бросил исключение и поймал его, чем возвращал ноль или KafkaNull
. Я использую свой собственный сериализатор Avro и десериализатор в нескольких различных проектах, некоторые из которых не Spring. Если бы я изменил свой сериализатор Avro и десериализатор, то нужно было бы изменить некоторые другие проекты, чтобы ожидать, что десериализатор вернет ноль.
Я бы хотел закрыть контейнер, чтобы я не потерял ни одного сообщения. SerializationException никогда не следует ожидать в производстве. Исключение SerializationException должно иметь место только в том случае, если не работает реестр схем или если неформатированное сообщение каким-либо образом отправлено в рабочую кафку. В любом случае, исключение SerializationException должно происходить очень редко, и если это произойдет, то я хочу закрыть контейнер так, чтобы сообщения не терялись, и я мог бы исследовать проблему.
Только учтите, что выловятся все исключения из вашего потребительского контейнера. В моем конкретном случае я просто хочу отключить, только если это SerializationException
public class SerializationExceptionHandler extends ContainerStoppingErrorHandler {
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
MessageListenerContainer container) {
//Only call super if the exception is SerializationException
if (thrownException instanceof SerializationException) {
//This will shutdown the container.
super.handle(thrownException, records, consumer, container);
} else {
//Wrap and re-throw the exception
throw new KafkaException("Kafka Consumer Container Error", thrownException);
}
}
}
Этот обработчик передается в потребительский контейнер. Ниже приведен пример
KafkaListenerContainerFactory
боб.
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setErrorHandler(new SerializationExceptionHandler());
factory.getContainerProperties().setTransactionManager(chainedTxM(jpa, kafka));
return factory;
}