Автокоммит Vertx Kafka с явным опросом - PullRequest
1 голос
/ 21 февраля 2020

Пытался написать потребитель kafka в java, используя Vertx.

Мне нужно установить автоматическую фиксацию как false (указать c вариант использования).

Ниже приведен код для явного опроса

consumer.subscribe("test", ar -> {

if (ar.succeeded()) {
System.out.println("Consumer subscribed");

vertx.setPeriodic(1000, timerId -> {

  consumer.poll(100, ar1 -> {

    if (ar1.succeeded()) {

      KafkaConsumerRecords<String, String> records = ar1.result();
      for (int i = 0; i < records.size(); i++) {
        KafkaConsumerRecord<String, String> record = records.recordAt(i);
        System.out.println("key=" + record.key() + ",value=" + record.value() +
          ",partition=" + record.partition() + ",offset=" + record.offset());
      }
    }
  });

});

}});

И для ручной фиксации:

consumer.commit(ar -> {

 if (ar.succeeded()) {
 System.out.println("Last read message offset committed");
 }
 });

Мой вопрос заключается в том, что если частота опроса установлена ​​на 1000 мс, а фиксация выполняется вручную, что произойдет, если сообщение не будет обработано в течение 1000 мс?

Будет ли выполнен следующий опрос до обработки первого набора сообщений? Если да, будет ли он снова получать тот же набор сообщений (которые еще не зафиксированы) или более новый набор сообщений?

1 Ответ

0 голосов
/ 21 февраля 2020

Глядя на документацию KafkaConsumer#poll:

В каждом опросе потребитель будет пытаться использовать последнее использованное смещение в качестве начального смещения и последовательно выбирать. Последнее использованное смещение может быть установлено вручную посредством поиска (TopicPartition, long) или автоматически установлено как последнее подтвержденное смещение для подписанного списка разделов

Последнее использованное смещение относится к внутреннее состояние KafkaConsumer, а не то, в котором оно зафиксировано. Это означает, что он не будет снова получать те же сообщения, но будет получать следующие 100.

...