Кафка: Опрос после транзакции производителя не получает произведенные сообщения - PullRequest
0 голосов
/ 09 ноября 2018

У меня есть приложение "потреблять-преобразовывать-производить" с единовременной скемантикой в ​​Кафке. Фаза (транзакция) создает новые сообщения на ту же тему, которая затем используется (транзакция = read_committed). Существует только один поток, который делает это, и гарантируется, что опрос потребителей происходит после фиксации транзакции производителя. Прямо сейчас у меня есть только одно утверждение опроса для каждого потребления-преобразования-производства-раунда.

Контрольный пример

Когда я запускаю свой тестовый случай, иногда могут появляться сообщения, которые какой-либо другой производитель отправил (разборчиво) до того, как транзакция моего производителя будет зафиксирована. Тогда я испытываю следующее:

Мой единственный оператор опроса возвращает только это постороннее сообщение, но не мое сообщение, созданное буквально на мгновение назад, хотя транзакция последнего раунда завершилась успешно.

Вопросы

  1. Мне чего-то не хватает, чтобы моя транзакция была результатом раунд не виден потребителю следующего раунда?
  2. Нужно ли проводить несколько опросов, пока один опрос не вернет 0 записей, и это говорит мне о том, что все сообщения на сервере в этом разделе читать?
  3. Может ли Кафка не гарантировать, что все сообщения, находящиеся в данный момент в разделе, прочитаны? Может быть, там нет чего-то вроде «Я сейчас читаю этот раздел»?

Конфигурация

  • Транзакционный потребитель

    окончательная карта consumerConfig = new LinkedHashMap <> (); consumerConfig.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER); consumerConfig.put (ConsumerConfig.CLIENT_ID_CONFIG, ID); consumerConfig.put (ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); consumerConfig.put (ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); consumerConfig.put (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, «последний»); consumerConfig.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); consumerConfig.put (ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); consumerConfig.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ()); consumerConfig.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ());

  • Транзакционный производитель

    финальная карта providerConfig = new LinkedHashMap <> (); providerConfig.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER); providerConfig.put (ProducerConfig.TRANSACTIONAL_ID_CONFIG, ID); providerConfig.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName ()); providerConfig.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName ());

  • Время ожидания моего опроса составляет 2 с

  • Насколько я понимаю, транзакционные производители автоматически становятся идемпотентами и имеют acks = all
  • Мой тестовый сценарий только с одним брокером и одной репликацией. Но, конечно, я намерен использовать больше в производстве
  • Я использую Кафку 2.0
  • В моей теме только один раздел
  • Мой поток имеет собственную группу потребителей и назначен этому единственному разделу

1 Ответ

0 голосов
/ 09 ноября 2018

Для вашего понимания того, как работает poll, параметр, который мы передаем poll (), является интервалом времени ожидания и определяет, как долго будет блокироваться poll (), если данные недоступны в буфере потребителя. Если установлено значение 0, poll () вернется немедленно; в противном случае он будет ожидать указанное число миллисекунд для получения данных от посредника. Так что если вы настроили опрос на 0 миллисекунд, и в буфере данных нет данных, и вы не получите никаких данных.

Если говорить о том, что вы не получили недавно созданные данные, это зависит от вашей конфигурации производителя. Если созданное сообщение не имеет своих реплик и основано на параметре acks, сообщение будет доступно потребителю для потребления.

Например: если вы установили реплики как 3 и acks = all, если все репликаторы не признают лидера, что они получили сообщение, это сообщение не будет доступно для потребителя для потребления.

Переходя к вопросу, как вы можете узнать, прочитали ли вы весь раздел, если ваш опрос больше не дает никаких записей (при условии, что все остальные работают нормально), то это означает, что вы использовали все сообщения для этой темы. .

...