Я не могу перейти на Spring 5, поэтому я застрял с spring-kafka 1.3 и его ограниченной обработкой ошибок.Поэтому у меня нет доступа к обработчикам ошибок ConsumerAwareErrorHandler или SeekToCurrent из spring-kafka 2.
Я использую аннотированный @KafkaListener
метод для прослушивания темы, я настроил io.confluent.kafka.serializers.KafkaAvroDeserializer
как свойvalue deserializer.
Проблема в том, что, если я заканчиваю с сообщением в моей теме, которое не в формате Avro, цикл опроса KafkaMessageListenerContainer застревает.Десериализатор создает исключение в сообщении, и цикл опроса никогда не просматривает его, поэтому в следующий раз в цикле он пытается десериализовать то же сообщение и продолжает цикл, сбрасывая одну и ту же ошибку тысячи раз в секунду в мой журнал.
Кажется, нет способа получить NeverRetryPolicy или что-нибудь в крайнем направлении, но я могу factory.getContainerProperties().setErrorHandler()
.К сожалению, я не уверен, что я могу сделать оттуда.
Есть ли что-то, что я могу Autowire в мой обработчик ошибок, который я могу использовать для поиска смещения вперед 1 при ошибке?Не уверен, что это, документы не много говорят о том, что вы можете сделать с помощью ErrorHandler, и большинство примеров, которые я могу найти, относятся к spring-kafka 2.X.Мол, это не десериализовано, я НИЧЕГО не могу сделать с сообщением, оно НИКОГДА не сработает, я хочу избежать повторной попытки его повторить, и кажется, что большинство вопросов Stackoverflow о том, как поступить наоборот.
I 'Мы также видели, что некоторые люди просто оборачивают Deserializer от Avro своим собственным классом, который ест исключение и возвращает ноль.Это лучший план?