Spring Kafka @KafkaListener Ошибка десериализации простого строкового потребителя - PullRequest
0 голосов
/ 20 апреля 2020

Я пытаюсь прочитать сообщение из чистой строковой схемы kafka topi c, используя Spring Kafka API, и затем я получаю сообщение об ошибке от Spring kafka API (нет доступной трассировки стека, в противном случае эта работа не будет такой болезненной), как: " Ошибка десериализации ключа / значения для раздела sometopicname-2 со смещением 98. Если необходимо, просмотрите запись, чтобы продолжить потребление.

Kafka Версии:

<groupId>org.springframework.kafka</groupId>

<artifactId>spring-kafka</artifactId>

<!-- <version>2.4.5.RELEASE</version> -->

Для kafka topi c он размещен на конфлюэнтной платформе с включенной службой реестра схем.

Схема из реестра реестра API: "schema": "\"string\""

Consumer Config:

    protected Map<String, Object> initializeCommonConsumerConfig() {
    final Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, appConfig.getConsumerKafkaBrokerAddress());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);

    props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, getFetchMaxWaitInMs());
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, getMaxPartitionFetchBytes());
    props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, getMinFetchBytes());
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, getMaxPollRecords());


    props.put("security.protocol", "PLAINTEXT");


    return props;
}
    private ConcurrentKafkaListenerContainerFactory<String,String> buildConcurrentKafkaListenerFactory() {

    Map<String, Object> properties = initializeCommonConsumerConfig();
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, appConfig.getConsumerGroupId());
    final ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<String, String>();
    factory.setConcurrency(1);
    factory.setBatchListener(false);
    factory.setConsumerFactory(
            new DefaultKafkaConsumerFactory<>(properties));
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
    //by default spring kafka is configured to send ack on error, disabling it 
    factory.getContainerProperties().setAckOnError(false);

    factory.setErrorHandler(new ErrorHandler() {
        @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
            logger.error(thrownException.toString());
            String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
            logger.debug(s);
            String topicsDashPartition = s.split(" at offset ")[0];
            logger.debug("topic - partition is: "+topicsDashPartition);
            String partitionS = topicsDashPartition.split("-")[topicsDashPartition.split("-").length-1];
            logger.debug("partitionS is "+partitionS);
            String topics = topicsDashPartition.substring(0,topicsDashPartition.length()-partitionS.length()-1);
            int offset = Integer.valueOf(s.split("offset ")[1]);
            logger.debug("offset is: " + offset);
            int partition = Integer.valueOf(partitionS);

            TopicPartition topicPartition = new TopicPartition(topics, partition);
            logger.info("Skipping " + topics + "-" + partition + " offset " + offset);
            consumer.seek(topicPartition, offset + 1);
        }


        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord, Consumer<?,?> consumer) {
            logger.error(e.toString());
            String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
            logger.debug(s);
            String topicsDashPartition = s.split(" at offset ")[0];
            logger.debug("topic - partition is: "+topicsDashPartition);
            String partitionS = topicsDashPartition.split("-")[topicsDashPartition.split("-").length-1];
            logger.debug("partitionS is "+partitionS);
            String topics = topicsDashPartition.substring(0,topicsDashPartition.length()-partitionS.length()-1);
            int offset = Integer.valueOf(s.split("offset ")[1]);
            logger.debug("offset is: " + offset);
            int partition = Integer.valueOf(partitionS);

            TopicPartition topicPartition = new TopicPartition(topics, partition);
            logger.info("Skipping " + topics + "-" + partition + " offset " + offset);
            consumer.seek(topicPartition, offset + 1);

        }

        @Override
        public void handle(Exception thrownException, ConsumerRecord<?, ?> data) {
            // TODO Auto-generated method stub

        }
    });
    return factory;
}



@Bean(name = CONTAINERFACTORY)
ConcurrentKafkaListenerContainerFactory<String, String> sampleConcurrentKafkaListenerContainerFactory() {
    return  buildConcurrentKafkaListenerFactory();
}

@Bean
public KafkaConsumer kafkaConsumer() {
    return new KafkaConsumer();
}

И моя функция прослушивателя выглядит следующим образом:

    @KafkaListener(id = "${kafka.consumer.groupid}", topics = "${kafka.consumer.topic}", containerFactory = com.rbc.wm.br.config.KafkaConsumerConfig.CONTAINERFACTORY)
public void listen(final Acknowledgment ack, final ConsumerRecord<String, Object> message) {
    String rtxml = null;
    try {
        //TODO internal logic
        logger.info("Got message with key {} on offset {}", message.key(),message.offset());
        logger.info(message.value().toString());

        /*
         * Logic to decode retrieve record
         * */
        ack.acknowledge();
    }
    catch(Exception ex) {
        logger.error("Exception occurred while decoding {}. Incorrect or Corrupted message.", message.key(), ex);
        ack.acknowledge();
    }
}

Есть ли какая-либо конфигурация, которую я пропустил при настройке этого потребителя?

Или есть способ увидеть всю трассировку стека в исключении, а не только одно сообщение об исключении от Spring Kafka? Я пытаюсь запустить в режим отладки, однако его там еще не было

Спасибо, что прочитали мой вопрос, и, пожалуйста, посоветуйте, если здесь что-то не так.

...