Идентификатор потребителя и идентификатор группы в Кафке: что делает двух потребителей одинаковыми - PullRequest
0 голосов
/ 16 мая 2019

Я использую Кафку уже несколько месяцев, и я понял, что некоторые из основных понятий мне пока не так понятны.Мое сомнение связано с отношением customerId, groupId и смещений.В нашем приложении нам нужно, чтобы Kafka работал с использованием парадигмы публикации - подписки, поэтому мы используем разные идентификаторы групп для каждого потребителя, которые генерируются случайным образом.

Раньше я думал, что при настройке auto.offset.reset = latest мои потребители всегда будут получать сообщения, которые они еще не получили, но в последнее время я узнал , что не так .Это работает, только если у потребителя еще нет зафиксированных смещений.В любом другом случае потребитель будет продолжать получать сообщения со смещением, превышающим последнее зафиксированное смещение.

Поскольку я всегда создаю новых потребителей со случайными групповыми идентификаторами, я понял, что у моих потребителей «нет памяти», они - новые потребители, и у них никогда не будет зафиксированных смещений, поэтому политика auto.offset.reset = latest всегда будет применяться.И вот тут начинаются мои сомнения. Предположим, следующий сценарий :

  1. У меня есть два клиентских приложения, A и B, с одним потребителем, каждое, работающее в режиме публикации - подписки (таким образом, с разными идентификаторами групп),Оба потребителя подписываются на тему my-topic.Значение auto.offset.reset равно latest для обоих потребителей.
  2. Некоторые производители (или производители) публикуют сообщения M1, M2 и M3 в теме my-topic.
  3. И A, и B получают M1, M2 и M3.
  4. Теперь я закрываю приложение B.
  5. Производители выдают сообщения M4 и M5.
  6. Приложение A получает сообщения M4 и M5.
  7. Теперь я перезапускаю приложение B. Помните, groupId является случайным, и я не устанавливаю никакой идентификатор потребителя, так что это означает, что это новый потребитель (верно?).Приложение B не получает никаких сообщений.
  8. Производители публикуют сообщения M6 и M7.
  9. Оба приложения A и B получают сообщения M6 и M7.

Итак, подводя итог, если я не ошибаюсь, A получает все сообщения, но B пропустил M4 и M5.Я пробовал это с kafka-console-consumer.sh, и он ведет себя так.

Итак, как я могу заставить приложение B получать сообщения, опубликованные во время его закрытия?Теперь, если я начну назначать тот же groupId, что и при первоначальном запуске, он будет читать сообщения M4 и M5, но это установка идентификатора группы.Можно ли также установить идентификатор потребителя и получить такое же поведение?

Или, другими словами, что понимается при повторном запуске одного и того же потребителя? Два потребителя - это один и тот же потребитель, еслиу них один и тот же идентификатор группы, один и тот же идентификатор потребителя, оба?

Кстати, идентификатор потребителя и свойство client.id совпадают?

1 Ответ

1 голос
/ 16 мая 2019

Два потребителя входят в одну группу, если они имеют одинаковую настройку group.id.

Я не совсем уверен, что вы имеете в виду под consumerId. Начиная с Kafka 2.2, в пользовательских конфигурациях .

такого поля не существует.

Если вы говорите о client.id, этот параметр не имеет функционального эффекта, он используется только для маркировки запросов, чтобы они могли быть сопоставлены в журнале брокера при необходимости.

Когда вы запускаете потребителя с помощью auto.offset.reset=latest, если не существует зафиксированных смещений, потребитель перезапустит потребление с конца журнала. Таким образом, он будет получать только сообщения, созданные после его запуска. Таким образом, в вашем сценарии вы правы, он никогда не получит M4 и M5.

Если вы хотите использовать все сообщения, вам нужно сохранить то же самое group.id. В этом случае auto.offset.reset будет применяться только при первом запуске клиента. Таким образом, когда ваш потребитель перезапустится, он поймет, где он был, когда он остановился.

...