Когда включена автоматическая фиксация Kafka, одно и то же смещение фиксируется несколько раз? - PullRequest
0 голосов
/ 27 мая 2018

Давайте предположим, enable.auto.commit = true , и давайте предположим, что тема, из которой я читаю сообщение, имеет некоторый длительный период бездействия (например, нет сообщений, скажем, 48 часов).В результате последовательный вызов poll () не вернет ни одного сообщения в течение 48 часов, у меня такой вопрос:

Будет ли зафиксировано смещение последнего возвращенного сообщения (то же самое для 48 часов) снова и снова каждый автоматически.commit.interval.ms в теме __ consumer_offsets , которая сжимается и срок действия которой контролируется offsets.retention.minutes ?

Фиксация снова и сноваможет предотвратить истечение срока действия записи в теме __ consumer_offsets и ее удаление в определенный момент.

1 Ответ

0 голосов
/ 28 мая 2018

Это интересный вариант.

Редактировать : на основе недавнего комментария, обновив его.Обновленные части зачеркнуты и помечены либо явно, либо курсивом .

Я бы пошел с "Нет" "Да" т. Е. Смещение последнего возвращенного сообщения будет НЕ подтверждено снова и снова, если в теме не появилось никаких новых сообщений.

Ниже приведено пояснение.

Типичный потребительский пример будет выглядеть примерно так:

Properties props = new Properties();
<other-properties>
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}

Итак, обычно ответственность за смещение коммитов лежит на потребителе и управляется циклом опроса.

Теперь, всценарий, который вы описали после последнего коммита, каждый вызов метода poll() вернет пустую карту. Итак, если нет записей, возвращаемых poll(), то нет новых смещений для фиксации.

Вот как я проследил исходный код Кафки и пришел к такому выводу.Следующий оператор возврата взят из определения метода poll(), данного здесь

return ConsumerRecords.empty();

Определение метода empty(), доступного в этом файле .

Редактировать : Следующая часть является новым дополнением на основе комментария от Гвен.

Однако, прежде чем вернуть пустую карту, есть еще один метод poll() (сидящий вConsumerCoordinator class) вызывается с помощью метода poll() класса KafkaConsumer, который согласно определению, данному здесь .обрабатывает периодические коммиты смещения, если они включены с помощью следующего метода:

public void maybeAutoCommitOffsetsAsync(long now) {
    if (autoCommitEnabled && now >= nextAutoCommitDeadline) {
        this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
        doAutoCommitOffsetsAsync();
    }
}

Надеюсь, это поможет!

...