Получение исключения сериализации при использовании из очереди Kafka с использованием Spring Boot - PullRequest
2 голосов
/ 26 мая 2019

Я использую Spring Boot с Kafka для получения сообщений из очереди Kafka.

Однако, если есть какая-либо ошибка в разборе полезной нагрузки Джексона.

Сообщение остается застрявшим и сохраняетпри повторной попытке потребления, все время выдавая исключение при разборе.

Я пытался использовать ErrorHandlingDeserializer2 в конфигурации Kafka, а также отображал обработчик ошибок, но проблема все равно сохраняется.

@Configuration
@EnableKafka
public class KafkaConfig 
{

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
        props.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, StringDeserializer.class);

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
        props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);

        props.put(ConsumerConfig.GROUP_ID_CONFIG, "${connecto.group-id}");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);

        return props;
    }

    @Bean
    public ConsumerFactory<String, UserDto> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(
                consumerConfigs(),
                new StringDeserializer(),
                new JsonDeserializer<>(UserDto.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, UserDto> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, UserDto> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        factory.setErrorHandler(new MosaicKafkaErrorHandler());

        return factory;
    }

    @Bean
    public KafkaTemplate kafkaTemplate() 
    {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ProducerFactory<String, UserSearchDto> producerFactory() 
    {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }
}

public class MosaicKafkaErrorHandler implements ContainerAwareErrorHandler{

@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
        MessageListenerContainer container) {
    thrownException.printStackTrace();

    }

}

IВы хотите, чтобы оно было реализовано таким образом, чтобы при возникновении исключения при обработке он просто регистрировал это сообщение и переходил к следующей записи, а не застревал с ним.

Ниже приведена ошибка-

Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.gxxx.mxxx.common.dto.UserDetailsDto` (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('{"field1":20000,"field2":20000,"type":""}')

at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1230)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:741)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:698)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition kafakaqueue.identify-1 at offset 326355379. If needed, please seek past the record to continue consumption.

1 Ответ

0 голосов
/ 26 мая 2019

Чтобы решить эту проблему, версия 2.2 представила ErrorHandlingDeserializer2.Этот десериализатор делегирует реальный десериализатор (ключ или значение).Если делегату не удается десериализовать содержимое записи, ErrorHandlingDeserializer2 возвращает нулевое значение и DeserializationException в заголовке, который содержит причину и необработанные байты.При использовании уровня записи MessageListener, если ConsumerRecord содержит заголовок DeserializationException для ключа или значения, контейнер ErrorHandler вызывается с ошибочным ConsumerRecord.Запись не передается слушателю.

Однако, поскольку вы используете более раннюю версию, вы можете использовать хак ниже.

     @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
                MessageListenerContainer container) {
            thrownException.printStackTrace();
            if (thrownException instanceOf SerializationException){
                String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
                String topics = s.split("-")[0];
                int offset = Integer.valueOf(s.split("offset ")[1]);
                int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);

                TopicPartition topicPartition = new TopicPartition(topics, partition);
                consumer.seek(topicPartition, offset + 1);  
               }
            }
...