Spring kafka имеет потребительскую модель, управляемую сообщениями; вы предоставляете прослушиватель сообщения POJO, а инфраструктура выполняет опрос и передает сообщение в прослушиватель, по одному или в пакете.
Имеет различные режимы для фиксации смещений (предпочитает отключать enable.auto.commmit
в клиенте).
Существует два режима ручного подтверждения AckMode.MANUAL
и AckMode.MANUAL_IMMEDIATE
; в этих режимах мы передаем объект Acknowledgment
компоненту-слушателю и вы вызываете ack.acknowledge()
.
Когда режим MANUAL_IMMEDIATE
, пока вы вызываете acknowledge()
в потоке потребителя, потребитель вызывается напрямую.
Когда режим MANUAL
, смещение добавляется во внутреннюю очередь, и фиксация будет выполняться в конце обработки результатов опроса.
Аналогично, существует несколько «автоматических» режимов подтверждения; основные из них RECORD
и BATCH
, где контейнер фиксирует смещения, когда слушатель обычно выходит. В режиме записи фиксация отправляется после обработки каждой записи, в пакетном режиме фиксация выполняется после обработки всех результатов опроса.
Фиксация смещений в партиях более эффективна, но увеличивает риск повторных поставок.
Мы также фиксируем любые ожидающие смещения, когда происходит перебалансировка.
Итак, почему два onPartitionsRevoked*
метода?
При использовании MANUAL, BATCH или других AckMode
, которые могут иметь ожидающие смещения для фиксации, onPartitionsRevokedBeforeCommit()
вызывается до того, как эти ожидающие смещения фиксируются, и onPartitionsRevokedAfterCommit()
вызывается после того, как эти смещения фиксируются.
Итак, consumer.position()
может возвращать разные результаты в каждом методе.
Большинство людей заинтересуются onPartitionsRevokedAfterCommit()
, но мы чувствовали, что должны предоставить оба варианта.
Если вы используете AckMode.MANUAL_IMMEDIATE
или AckMode.RECORD
, не должно быть никакой разницы, поскольку не будет ожидающих подтверждений.
Однако, поскольку прослушиватель вызывается в потоке-потребителе, во время опроса разница будет действительно только при использовании одного из AckMode
s, основанного на времени или количестве. С другими ackmodes мы уже зафиксировали смещения.
Надеюсь, это понятно.