onPartitionsRevokedBeforeCommit против onPartitionsRevokedAfterCommit - PullRequest
0 голосов
/ 10 апреля 2019

Я пишу для потребителя Kafka и в целях обучения на этот раз я подумал об использовании реализации Spring-Kafka. До сих пор я использовал Java API для написания потребителей.

Я хочу управлять смещением вручную, поэтому я искал что-то похожее на ConsumerRebalanceListener в пакете Spring-Kafka. И к моему успеху я наткнулся на ConsumerAwareRebalanceListener в Spring, который можно использовать вместо ConsumerRebalanceListener.

Но когда я посмотрел на интерфейс ConsumerAwareRebalanceListener, я увидел 2 метода - onPartitionsRevokedBeforeCommit и onPartitionsRevokedAfterCommit, который недоступен в Java-API Kafka.

Пожалуйста, кто-нибудь может объяснить, как / где я могу использовать эти методы?

P.S. Посмотрел реализацию Spring-Kafka, но не совсем понял, где это будет полезно.

1 Ответ

1 голос
/ 10 апреля 2019

Spring kafka имеет потребительскую модель, управляемую сообщениями; вы предоставляете прослушиватель сообщения POJO, а инфраструктура выполняет опрос и передает сообщение в прослушиватель, по одному или в пакете.

Имеет различные режимы для фиксации смещений (предпочитает отключать enable.auto.commmit в клиенте).

Существует два режима ручного подтверждения AckMode.MANUAL и AckMode.MANUAL_IMMEDIATE; в этих режимах мы передаем объект Acknowledgment компоненту-слушателю и вы вызываете ack.acknowledge().

Когда режим MANUAL_IMMEDIATE, пока вы вызываете acknowledge() в потоке потребителя, потребитель вызывается напрямую.

Когда режим MANUAL, смещение добавляется во внутреннюю очередь, и фиксация будет выполняться в конце обработки результатов опроса.

Аналогично, существует несколько «автоматических» режимов подтверждения; основные из них RECORD и BATCH, где контейнер фиксирует смещения, когда слушатель обычно выходит. В режиме записи фиксация отправляется после обработки каждой записи, в пакетном режиме фиксация выполняется после обработки всех результатов опроса.

Фиксация смещений в партиях более эффективна, но увеличивает риск повторных поставок.

Мы также фиксируем любые ожидающие смещения, когда происходит перебалансировка.

Итак, почему два onPartitionsRevoked* метода?

При использовании MANUAL, BATCH или других AckMode, которые могут иметь ожидающие смещения для фиксации, onPartitionsRevokedBeforeCommit() вызывается до того, как эти ожидающие смещения фиксируются, и onPartitionsRevokedAfterCommit() вызывается после того, как эти смещения фиксируются.

Итак, consumer.position() может возвращать разные результаты в каждом методе.

Большинство людей заинтересуются onPartitionsRevokedAfterCommit(), но мы чувствовали, что должны предоставить оба варианта.

Если вы используете AckMode.MANUAL_IMMEDIATE или AckMode.RECORD, не должно быть никакой разницы, поскольку не будет ожидающих подтверждений.

Однако, поскольку прослушиватель вызывается в потоке-потребителе, во время опроса разница будет действительно только при использовании одного из AckMode s, основанного на времени или количестве. С другими ackmodes мы уже зафиксировали смещения.

Надеюсь, это понятно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...