Метод Кафки ConsumerInterceptor onCommit () вызывается непрерывно? - PullRequest
0 голосов
/ 03 декабря 2018

Я использую KafkaConsumer со следующими настройками

  • enable.auto.commit = true
  • auto.commit.interval.ms = 2000

.. и опрос каждые 5 секунд.У меня есть фиктивная ConsumerInterceptor реализация

@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
    System.out.println("onCommit() invoked with records - " + map.size());
    Set<Map.Entry<TopicPartition, OffsetAndMetadata>> committedEntries = map.entrySet();

    for (Map.Entry<TopicPartition, OffsetAndMetadata> committedEntry : committedEntries) {    
        System.out.println("committedRecordTopic " + committedEntry.getKey().topic());
        System.out.println("committedRecordPartition " + committedEntry.getKey().partition());
        System.out.println("committedRecordOffset " + committedEntry.getValue().offset());
    }

}

Когда я отправляю запись в Kafka, метод перехватчика onCommit вызывается неоднократно, и я снова вижу этот тип вывода (как указано выше перехватчиком)и снова

onCommit() invoked with records - 1
committedRecordTopic foo
committedRecordPartition 0
committedRecordOffset 12

Почему это происходит?Если последняя запись была зафиксирована, почему onCommit() вызывается с той же самой записью?Этого не происходит, когда я переключаюсь на ручную фиксацию с enable.auto.commit = false и вызываю commitSync() в потребительской логике

...