Kafks consumer.poll не возвращает данных - PullRequest
0 голосов
/ 04 марта 2019

У меня есть два брокера Kafka (2.11-0.11.0.1).Коэффициент репликации тем по умолчанию установлен равным 2. Производители записывают данные только в нулевой раздел.

И у меня есть запланированный исполнитель, который периодически запускает задачу.Когда он использует тему с небольшим количеством записей в минуту (100 в минуту), он работает как шарм.Но для огромных тем (10K в минуту) метод poll не возвращает данных.

Задача:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public final class TopicToDbPump implements Runnable {
  private static final Logger log = LoggerFactory.getLogger(TopicToDbPump.class);
  private final String topic;
  private final TopicPartition topicPartition;
  private final Properties properties;

  public TopicToDbPump(String topic, Properties properties) {
    this.topic = topic;
    topicPartition = new TopicPartition(topic, 0);
    this.properties = properties;
  }

  @Override
  public void run() {
    try (final Consumer<String, String> consumer = new KafkaConsumer<>(properties)) {
      consumer.assign(Collections.singleton(topicPartition));
      final long offset = readOffsetFromDb(topic);
      consumer.seek(topicPartition, offset);
      final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
      if (records.isEmpty()) {
        log.debug("No data from topic " + topic + " available");
        return;
      }
      saveData(records.records(topic));
    } catch (Throwable t) {
      log.error("Etl process " + topic + " failed with exception", t);
    }
  }
}

Параметры потребителей:

"bootstrap.servers" = "host-1:9092,host-2:9092",
"group.id" = "my-group",
"enable.auto.commit" = "false",
"key.deserializer" = "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" = "org.apache.kafka.common.serialization.StringDeserializer",
"max.partition.fetch.bytes": "50000000",
"max.poll.records": "10000"

Что не так?

1 Ответ

0 голосов
/ 04 марта 2019

API-интерфейс Kafka Consumer не гарантирует, что при первом вызове poll() будут возвращены какие-либо данные.

Потребитель должен сначала подключиться к кластеру, найти лидеров для всех разделов, которым он назначен.Как вы понимаете, это может занять несколько секунд, поэтому маловероятно, что данные будут доставлены немедленно.

Вместо этого сначала следует вызвать poll(), если данные не возвращаются первыми.

...