Как установить свойства производителя кафки для атомного потребителя кафки - PullRequest
2 голосов
/ 25 октября 2019

Я просматривал эту статью , в которой объясняется, как обеспечить обработку сообщения ровно один раз, выполнив следующее:

  • Чтение (тема, раздел, смещение) из базы данных при запуске/ restart
  • Чтение сообщения из определенного (тема, раздел, смещение)
  • Атомно делать следующие вещи (например, в той же транзакции базы данных):
    • Обработка сообщения
    • Фиксировать смещение в базе данных как (тема, раздел, смещение)
    • Вручную зафиксировать смещение в Kafka, вызвав consumer.commitAsync() или consumer.commitSync()

Я сомневаюсь, каков эффект установки разных значений для разных потребительских свойств:

  1. enable.auto.commit
    Как мне установить это свойство? true или false? В статье говорится, что мы должны установить его на false. Но что может пойти не так, если я установлю true? В этом я сохраняю смещение для внешней базы данных. Таким образом, после сбоя, когда потребитель подключается к сети, он начинает потреблять данные со смещения, сохраненного в базе данных. Так что, я чувствую, значение этого свойства не влияет на запуск / перезапуск.
    Также я не чувствую, что будет какое-либо влияние различных значений этого свойства в пределах одного прогона потребителя, так как смещение используется для чтения следующего сообщения имы фиксируем это вручную или автоматически не имеет никакого эффекта (это будет то же самое смещение).

  2. auto.offset.reset
    Существует два основных значения этого свойства latest и earliest. Если установлено значение latest, это заставит потребителя читать сообщения, помещенные после, то есть после запуска потребителя. Если установлено значение earliest, потребитель будет читать из первого непрочитанного сообщения. Так как это влияет на то, откуда потребитель должен начать читать сообщение при запуске, я чувствую, что это свойство также не окажет никакого влияния на атомарный потребитель, указанный в статье. Это связано с тем, что в этой реализации вновь запущенный потребитель начинает читать сообщения со смещения, указанного в базе данных.

Верно ли я согласен с обеими мыслями?

Ответы [ 2 ]

0 голосов
/ 01 ноября 2019
  1. enable.auto.commit
    Когда потребитель перезапускается после сбоя, он начинает использовать раздел раздела со смещением, полученным из базы данных. Значение этого свойства будет использоваться при единственном прогоне потребителя без сбоев, как в случае любого другого сценария.

    Автоматическая фиксация делает потребительскую фиксацию каждые 5 секунд (значение по умолчанию auto.commit.interval.ms) или при каждом poll() вызовах.

    Ручная фиксация (enable.auto.commit=false) помогает избежать обработки сообщений. Например, если истекает 5-секундный таймер автоматической фиксации между чтением и обработкой сообщения, он может в итоге выполнить: (read, commit, process) в этой последовательности. И если потребитель завершает работу после коммита, без обработки сообщения (read,commit,crash), это сообщение никогда не будет обработано, потому что на следующем poll() потребитель получит следующее сообщение (так как фиксация прошла успешно). Мы можем предотвратить это, выполнив ручную фиксацию в следующем порядке: (read, process,commit).

    Однако при этом существует вероятность того, что потребитель может потерпеть крах после обработки без фиксации (read,process,crash). Это приведет к повторной обработке того же сообщения в следующем poll().

    Чтобы избежать этой дублирующей обработки, мы сохраняем смещение во внешней базе данных и извлекаем его при перезапуске потребителя. Обратите внимание, что, сохраняя смещение в базе данных и извлекая его при перезапуске потребителя, также следует избегать отсутствия обработки сообщения в случае последовательности (read,commit,crash), которая может возникнуть в случае автоматической фиксации.

    Таким образом, вкратце, ручная фиксация выполняетне служат какой-либо цели, когда мы храним смещение для внешней базы данных. Следовательно, мы можем установить enable.auto.commit в любое значение true или false. Однако, когда установлено значение false, мы не должны забывать фиксировать явно. В противном случае потребитель будет продолжать читать и обрабатывать одно и то же сообщение.

  2. auto.offset.reset

    Его значение будет иметь значение, если в базе данных нет значения смещения для данного раздела темы. ,Это произойдет, когда потребитель будет запущен впервые или база данных будет усечена. В этом случае нам нужно, чтобы потребитель начал потреблять с первого сообщения, которое не было использовано ни одним потребителем в его группе потребителей. Для этого нам нужно установить для свойства earliest.

0 голосов
/ 25 октября 2019

Если вы используете Kafka Stream, он поддерживает шаблон потока «ровно один раз», вы можете сослаться на

Шаблон потока «ровно один раз» - это просто возможность выполнить операцию чтения-записи-записи ровно один раз. Означает, что вы потребляете одно сообщение за раз, получаете процесс и публикуетесь в другой теме и фиксируете. Таким образом, commit будет обрабатываться Stream автоматически по одному сообщению за раз.

Если вы не используете конкретный вариант использования, для которого лучше использовать Kafka с предоставленным коммитом, в противном случае вам нужно вручную обработать множество сценариев сбоя, например, что произошло, если возникла проблема с подключением к БД .. и при обработкемиллион или записывает требуемый очень частый доступ к БД. Кафка уже хранит фиксированное смещение, что лучше с точки зрения производительности, не требует никакого внешнего подключения к БД.

_consumer_offsets хранит информацию о фиксированных смещениях для каждой темы, разделана группу потребителей. Который использует коммит для обновления деталей смещения по этой теме

Если у вас есть конкретный случай использования, где требуется атомарная транзакция, или вы хотите сохранить детали смещения рядом с вашим результатом, вы можете управлять внешним хранилищем смещений, как упоминалось здесь

enable.auto.commit: В случае использования внешнего управления смещением вы не используете сценарий фиксации, поэтому не имеет значения, если enable.auto.commitвключить или отключить. Вы будете продолжать получать сообщения, используя метод поиска (раздел TopicPartition, длинное смещение) и сохраняя смещение извне. В случае перезапуска начните извлекать последнее сохраненное смещение. Единственное влияние, если какая-то встроенная панель мониторинга, такая как Confluent Control-Center, Grafana и т. Д., Используемая для поддержки темы Kafka, не будет отражаться в случае, если не выполнить фиксацию вручную и enable.auto.commit false.

auto.offset.reset Да, это влияет только при первом запуске, но так как вы используете для извлечения сообщения из определенного раздела и смещения не влияет.

======================= Обновлено =======================

enable.auto.commit - если true (по умолчанию), периодически фиксировать смещение последнего сообщения, переданного приложению. Фиксированное смещение будет использовано при перезапуске процесса для определения того, где оно остановилось.

auto.commit.interval.ms - частота в миллисекундах, в течение которой смещения потребителя фиксируются (записываются) в смещение хранилища.

Примечание: , если enable.auto.commit false не будет использовать auto.commit.interval.ms

В случае enable.auto.commit true метод фиксациибудет получать вызов при каждом опросе, и, если auto.commit.interval.ms получит смещение, будет зафиксировано

  1. интервал опроса> интервал фиксации: смещение фиксации во время интервала опроса
  2. интервал опроса<интервал фиксации: метод фиксации будет вызывать каждый опрос, но смещение будет зафиксировано при последовательном опросе () после истечения интервала фиксации. </li>
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...