Я использую Кафку уже несколько месяцев, и я понял, что некоторые из основных понятий мне пока не так понятны.Мое сомнение связано с отношением customerId, groupId и смещений.В нашем приложении нам нужно, чтобы Kafka работал с использованием парадигмы публикации - подписки, поэтому мы используем разные идентификаторы групп для каждого потребителя, которые генерируются случайным образом.
Раньше я думал, что при настройке auto.offset.reset = latest
мои потребители всегда будут получать сообщения, которые они еще не получили, но в последнее время я узнал , что не так .Это работает, только если у потребителя еще нет зафиксированных смещений.В любом другом случае потребитель будет продолжать получать сообщения со смещением, превышающим последнее зафиксированное смещение.
Поскольку я всегда создаю новых потребителей со случайными групповыми идентификаторами, я понял, что у моих потребителей «нет памяти», они - новые потребители, и у них никогда не будет зафиксированных смещений, поэтому политика auto.offset.reset = latest
всегда будет применяться.И вот тут начинаются мои сомнения. Предположим, следующий сценарий :
- У меня есть два клиентских приложения, A и B, с одним потребителем, каждое, работающее в режиме публикации - подписки (таким образом, с разными идентификаторами групп),Оба потребителя подписываются на тему
my-topic
.Значение auto.offset.reset
равно latest
для обоих потребителей. - Некоторые производители (или производители) публикуют сообщения M1, M2 и M3 в теме
my-topic
. - И A, и B получают M1, M2 и M3.
- Теперь я закрываю приложение B.
- Производители выдают сообщения M4 и M5.
- Приложение A получает сообщения M4 и M5.
- Теперь я перезапускаю приложение B. Помните,
groupId
является случайным, и я не устанавливаю никакой идентификатор потребителя, так что это означает, что это новый потребитель (верно?).Приложение B не получает никаких сообщений. - Производители публикуют сообщения M6 и M7.
- Оба приложения A и B получают сообщения M6 и M7.
Итак, подводя итог, если я не ошибаюсь, A получает все сообщения, но B пропустил M4 и M5.Я пробовал это с kafka-console-consumer.sh
, и он ведет себя так.
Итак, как я могу заставить приложение B получать сообщения, опубликованные во время его закрытия?Теперь, если я начну назначать тот же groupId, что и при первоначальном запуске, он будет читать сообщения M4 и M5, но это установка идентификатора группы.Можно ли также установить идентификатор потребителя и получить такое же поведение?
Или, другими словами, что понимается при повторном запуске одного и того же потребителя? Два потребителя - это один и тот же потребитель, еслиу них один и тот же идентификатор группы, один и тот же идентификатор потребителя, оба?
Кстати, идентификатор потребителя и свойство client.id совпадают?