Потребитель Кафки висит на опросе, когда Кафка не работает - PullRequest
0 голосов
/ 10 мая 2018

Я играл с базовой настройкой Zookeeper и Kafka, чтобы узнать, как ее использовать, но у меня возникли проблемы с потребителем.Когда Kafka недоступен, вызов метода poll() зависает до тех пор, пока он не возвращается в оперативное состояние.

Версия Kafka: 0.10.1.0

Мой код выглядит следующим образом:

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(topics);

while (!stopped) {
    // If by any reason Kafka is not available this call will hang
    // until Kafka is back online.
    records = consumer.poll(timeout);

    for (ConsumerRecord<String, byte[]> record : records) {
        process(record);
    }

    Thread.sleep(sleepTime);
}

Я читал, что когда я звоню на poll(), потребитель будет пытаться подключиться к Kafka на неопределенный срок, пока он не вернется в оперативный режим или пока не будет вызван consumer.wakeup().

Iхотите, чтобы код действовал по-другому, когда Кафка не в сети. Существует ли какой-либо способ ограничения повторных попыток потребителя или его сбоя при опросе несуществующей кафки?

1 Ответ

0 голосов
/ 10 мая 2018

К сожалению, это все еще проблема. Многие потребительские методы могут зависать при разных сценариях.

Выполняется предложение по улучшению Kafka, KIP-266 , чтобы добавить тайм-ауты в методы Consumer во избежание зависаний.

Насколько я знаю, вызов wakeup() из другого потока - лучший обходной путь


РЕДАКТИРОВАТЬ: Начиная с Kafka 2.0.0, все звонки потребителей могут принимать тайм-аут. Это позволяет восстановить контроль в случае падения брокеров.

...