Опрос потребителей Kafka работает бесконечно и ничего не возвращает - PullRequest
0 голосов
/ 10 февраля 2020

Я столкнулся с проблемой с KafkaConsumer.poll (время ожидания), в котором он работает бесконечно и никогда не выходит из метода. Поймите, что это может быть связано с подключением, и я иногда видел его немного противоречивым. Как мне справиться с этим, если опрос перестает отвечать? Ниже приведен фрагмент от KafkaConsumer.poll ()

public ConsumerRecords<K, V> poll(final Duration timeout) {
    return poll(time.timer(timeout), true);
}

, и я звоню по вышеупомянутому отсюда:

Duration timeout = Duration.ofSeconds(30);
    while (true) {
        final ConsumerRecords<recordID, topicName> records = consumer.poll(timeout);
        System.out.println("record count is" + records.count());
}

Я получаю следующую ошибку :

org. apache .kafka.common.errors.SerializationException: Ошибка десериализации ключа / значения для раздела со смещением 2. При необходимости, пожалуйста, просмотрите запись, чтобы продолжить потребление.

1 Ответ

0 голосов
/ 17 февраля 2020

Я наткнулся на некоторую полезную информацию, пытаясь решить проблему, с которой столкнулся выше. Я приведу фрагмент кода, который должен справиться с этим, но перед этим важно знать, что вызывает это.

При создании или использовании сообщения или данных для Apache Kafka нам нужна структура схемы для этого сообщения или данных, в моем случае схема Avro. Если в Kafka создается конфликт сообщения, который конфликтует с этой схемой сообщения, это повлияет на потребление.

Добавьте приведенный ниже код в свой потребительский топи c в методе, где он использует записи - -

не забудьте импортировать следующие пакеты:

import org. apache .kafka.common.TopicPartition;
import org.jsoup.SerializationException;

try {
        while (true) {
            ConsumerRecords<String, GenericRecord> records = null;
            try {
                records = consumer.poll(10000);
            } catch (SerializationException e) {
                String s = e.getMessage().split("Error deserializing key/value 
for partition ")[1].split(". If needed, please seek past the record to 
continue consumption.")[0];
                String topics = s.split("-")[0];
                int offset = Integer.valueOf(s.split("offset ")[1]);
                int partition = Integer.valueOf(s.split("-")[1].split(" at") . 
   [0]);

                TopicPartition topicPartition = new TopicPartition(topics, 
 partition);
                //log.info("Skipping " + topic + "-" + partition + " offset " 
 + offset);
                consumer.seek(topicPartition, offset + 1);
            }


            for (ConsumerRecord<String, GenericRecord> record : records) {

                System.out.printf("value = %s \n", record.value());


            }

        }


    } finally {
        consumer.close();
    }
...