Я использую KafkaConsumer
со следующими настройками
enable.auto.commit
= true
auto.commit.interval.ms
= 2000
.. и опрос каждые 5 секунд.У меня есть фиктивная ConsumerInterceptor
реализация
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
System.out.println("onCommit() invoked with records - " + map.size());
Set<Map.Entry<TopicPartition, OffsetAndMetadata>> committedEntries = map.entrySet();
for (Map.Entry<TopicPartition, OffsetAndMetadata> committedEntry : committedEntries) {
System.out.println("committedRecordTopic " + committedEntry.getKey().topic());
System.out.println("committedRecordPartition " + committedEntry.getKey().partition());
System.out.println("committedRecordOffset " + committedEntry.getValue().offset());
}
}
Когда я отправляю запись в Kafka, метод перехватчика onCommit
вызывается неоднократно, и я снова вижу этот тип вывода (как указано выше перехватчиком)и снова
onCommit() invoked with records - 1
committedRecordTopic foo
committedRecordPartition 0
committedRecordOffset 12
Почему это происходит?Если последняя запись была зафиксирована, почему onCommit()
вызывается с той же самой записью?Этого не происходит, когда я переключаюсь на ручную фиксацию с enable.auto.commit
= false
и вызываю commitSync()
в потребительской логике