Как обеспечить единовременную семантику при обработке сообщений кафки в Apache Storm - PullRequest
0 голосов
/ 15 октября 2019

Мне понадобилась ровно однажды доставка в моем приложении. Я изучил kafka и понял, что чтобы сообщение было создано ровно один раз, я должен установить idempotence=true в конфигурации производителя. Это также устанавливает acks=all, заставляя производителя отправлять сообщения до тех пор, пока все реплики не подтвердят его. Чтобы гарантировать, что потребитель не выполнит дублирующую обработку или не оставит какое-либо сообщение необработанным, рекомендуется зафиксировать вывод обработки и смещение во внешней базе данных в одной транзакции базы данных, чтобы либо оба они были сохранены, либо ни одно из них не избежало дублирования и не обрабатывалось.

В потребителе сообщение остается обработанным, если потребитель сначала его фиксирует, но происходит сбой перед его обработкой, и сообщение обрабатывается более одного раза, если потребители сначала его обрабатывают, но перед его отправкой происходит сбой.

Q1. Теперь я догадывался, как я могу подражать тому же Apache Storm. Полагаю, что когда сообщение будет создано, можно установить idemptence=true в KafkaBolt. Я прав?

Я догадывался, как можно обеспечить обработку пропущенных и дублированных сообщений в Storm. Например, на этой странице документа говорится, что если я закреплю кортеж (передав его в качестве первого параметра в OutputCollector.emit()), а затем передам кортеж в OutputCollector.ack() или OutputCollector.fail(), Storm обеспечит потерю данных. Это то, что он точно говорит:

Теперь, когда вы понимаете алгоритм надежности, давайте рассмотрим все случаи сбоев и посмотрим, как в каждом случае Storm избегает потери данных:

  • Кортеж не получен, потому что задача умерла: В этом случае идентификаторы кортежа носика в корне деревьев для неудачного кортежа истечут и будут воспроизведены.

  • Задача Acker умирает: В этом случае для всех наборов-носителей, которые отслеживал Acker, время ожидания истекает и воспроизводится.

  • Задача носика умирает: В этом случае источник, с которым разговаривает носик, отвечает за воспроизведение сообщений. Например, такие очереди, как Kestrel и RabbitMQ, будут помещать все ожидающие сообщения обратно в очередь при отключении клиента.

Q2. Полагаю, это гарантирует, чтосообщение не остается необработанным, но не избегает повторной обработки сообщений. Я прав с этим? Также есть что-нибудь еще, что Storm предлагает, чтобы точно пропустить семантику, такую ​​как kafka, которую я пропускаю?

1 Ответ

1 голос
/ 18 октября 2019

Относительно Q1: Да, вы можете получить то же поведение от KafkaBolt, установив это свойство, KafkaBolt просто оборачивает KafkaProducer.

Что касается семантики на стороне потребления, у вас есть то же самоеварианты с Storm, как вы делаете с Kafka. Когда вы читаете сообщение от Kafka, вы можете выбрать фиксацию до или после выполнения вашей обработки (например, запись в базу данных). Если вы сделаете это раньше, и программа потерпит крах, вы потеряете сообщение. Давайте назовем это at-most-once processing. Если вы сделаете это после, вы рискуете обработать одно и то же сообщение дважды, если после обработки произойдет сбой программы, но до принятия, под названием at-least-once processing.

Итак, что касается Q2: Да, использование привязанных кортежей и блокировка обеспечатВы с at-least-once семантикой. Если вы не используете привязанный кортеж, вы получите at-most-once.

Да, есть еще что-то, что Storm предлагает для точного обеспечения семантики, называемой Trident, но это требует от вас писать свою топологию по-другому, и ваше хранилище данных должно бытьадаптированы к нему, так что дедупликация сообщений может произойти. См. Документацию по адресу https://storm.apache.org/releases/2.0.0/Trident-tutorial.html.

Также просто предупреждаю вас: когда в документации по Storm (или Kafka) говорится о семантике «точно один раз», существуют некоторые предположения относительно того, какой тип обработки вы будете выполнять. Например, когда документы Storm о Trident говорят ровно один раз, есть предположение, что вы адаптируете свою базу данных, чтобы при получении сообщения вы могли решить, было ли оно сохранено. Когда документация Кафки говорит об одномоментном предположении, предполагается, что ваша обработка будет считывать данные из Кафки, выполнять некоторые вычисления (скорее всего без побочных эффектов) и записывать обратно в Кафку.

Это просто говорит о том, что для некоторых типов обработки вам все равно может потребоваться выбор между at-least-once и at-most-once. Если вы можете сделать свою обработку идемпотентной, at-least-once - хороший вариант.

Наконец, если ваша обработка соответствует модели «читать из Kafka, делать вычисления, записывать в Kafka», вы, вероятно, можете получить более хорошую семантику изKafka Streams чем Storm, так как Storm не может обеспечить семантику, которую Кафка может предоставить в этом случае точно.

...