Я играл с базовой настройкой Zookeeper и Kafka, чтобы узнать, как ее использовать, но у меня возникли проблемы с потребителем.Когда Kafka недоступен, вызов метода poll()
зависает до тех пор, пока он не возвращается в оперативное состояние.
Версия Kafka: 0.10.1.0
Мой код выглядит следующим образом:
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(topics);
while (!stopped) {
// If by any reason Kafka is not available this call will hang
// until Kafka is back online.
records = consumer.poll(timeout);
for (ConsumerRecord<String, byte[]> record : records) {
process(record);
}
Thread.sleep(sleepTime);
}
Я читал, что когда я звоню на poll()
, потребитель будет пытаться подключиться к Kafka на неопределенный срок, пока он не вернется в оперативный режим или пока не будет вызван consumer.wakeup()
.
Iхотите, чтобы код действовал по-другому, когда Кафка не в сети. Существует ли какой-либо способ ограничения повторных попыток потребителя или его сбоя при опросе несуществующей кафки?