java KafkaConsumer никогда не получит результаты - PullRequest
0 голосов
/ 29 августа 2018

Я новичок в kafka, у меня есть следующий пример кода:

KafkaConsumer<String,String> kc = new KafkaConsumer<String, String>(props);
while(true) {
    List<String> topicNames = Arrays.asList(topics.split(","));
    if (!kc.assignment().isEmpty()) {
        kc.unsubscribe();
    }
    kc.subscribe(topicNames);
    ConsumerRecords<String, String> recv = kc.poll(1000L);
    if (!recv.isEmpty()) {
        System.out.println("NOT EMPTY");
    }
}

Recv всегда пуст, но если я пытаюсь увеличить тайм-аут пула, записи возвращаются, даже если я отключаю часть отказа от подписки.

Я взял этот фрагмент кода у проприетарного программного обеспечения и не могу его изменить.

Итак, мой вопрос: это только проблема синхронизации или есть еще?

Ответы [ 2 ]

0 голосов
/ 29 августа 2018

Многое происходит, когда потребитель (пере) подписывается на тему.

Очень грубо и, насколько я помню, потребитель будет:

  • запрос информации о кластере
  • запрос метаданных группы потребителей
  • сделать запрос JOIN_GROUP
  • назначить определенные разделы

Базовые механизмы еще более сложны, если в одной группе больше потребителей. Это связано с тем, что разделы должны быть переназначены между всеми потребителями в группе.

Вот почему:

  • 1000 миллис может быть недостаточно для всего этого, и вы ничего не опросили вовремя
  • вы что-то опросили, когда увеличили время ожидания, потому что Кафке удалось выполнить все эти операции начальной загрузки
  • вы что-то опросили, когда удалили отписку от темы, потому что, скорее всего, ваш потребитель уже подписан

Так что есть проблема с синхронизацией. И я думаю, что есть нечто большее - отмена / подписка на тему в бесконечном цикле для меня не имеет смысла (см. Другой ответ).

0 голосов
/ 29 августа 2018

Вы должны подписаться на свои темы только один раз в начале. Как это:

 final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
 consumer.subscribe(Arrays.asList("foo", "bar"));

 while (true) {
     final ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records)
         System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
 }
...