Как перемещать прошлые недериализуемые сообщения в Spring Kafka 1.3 с Avro - PullRequest
0 голосов
/ 20 февраля 2019

Я не могу перейти на Spring 5, поэтому я застрял с spring-kafka 1.3 и его ограниченной обработкой ошибок.Поэтому у меня нет доступа к обработчикам ошибок ConsumerAwareErrorHandler или SeekToCurrent из spring-kafka 2.

Я использую аннотированный @KafkaListener метод для прослушивания темы, я настроил io.confluent.kafka.serializers.KafkaAvroDeserializer как свойvalue deserializer.

Проблема в том, что, если я заканчиваю с сообщением в моей теме, которое не в формате Avro, цикл опроса KafkaMessageListenerContainer застревает.Десериализатор создает исключение в сообщении, и цикл опроса никогда не просматривает его, поэтому в следующий раз в цикле он пытается десериализовать то же сообщение и продолжает цикл, сбрасывая одну и ту же ошибку тысячи раз в секунду в мой журнал.

Кажется, нет способа получить NeverRetryPolicy или что-нибудь в крайнем направлении, но я могу factory.getContainerProperties().setErrorHandler().К сожалению, я не уверен, что я могу сделать оттуда.

Есть ли что-то, что я могу Autowire в мой обработчик ошибок, который я могу использовать для поиска смещения вперед 1 при ошибке?Не уверен, что это, документы не много говорят о том, что вы можете сделать с помощью ErrorHandler, и большинство примеров, которые я могу найти, относятся к spring-kafka 2.X.Мол, это не десериализовано, я НИЧЕГО не могу сделать с сообщением, оно НИКОГДА не сработает, я хочу избежать повторной попытки его повторить, и кажется, что большинство вопросов Stackoverflow о том, как поступить наоборот.

I 'Мы также видели, что некоторые люди просто оборачивают Deserializer от Avro своим собственным классом, который ест исключение и возвращает ноль.Это лучший план?

1 Ответ

0 голосов
/ 21 февраля 2019

Проблема в том, что десериализация дает сбой задолго до того, как Spring получает данные - проблема в самой Kafka.

В 2.2 мы добавили ErrorHandlingDeserializer2, который оборачивает настоящий десериализатор и отправляет сигнал слушателюконтейнер, чтобы ошибка могла быть отправлена ​​в обработчик ошибок.

В более старых версиях вам нужно было бы написать свою собственную оболочку десериализатора - однако в контейнере нет кода для решения этой ситуации, поэтому ваш блок catchпотребуется вернуть реальный объект, который является сигналом для вашего слушателя, что десериализация не удалась.

Допустим, ваш слушатель получает Invoice.Ваш блок catch может создать подкласс, скажем BadInvoice, который вы затем сможете обнаружить в своем слушателе и отбросить его.

Я также видел, как некоторые люди просто оборачивали десериализатор из Avro своим собственным классом.который ест исключение и возвращает ноль.Это лучший план?

Вы можете вернуть null, если вы никогда не получите реальные нулевые записи, но вам придется добавить @Payload(required = false) к параметру метода.

...