Правильно ли использовать базу данных в качестве хранилища для состояний сообщений, потребляемых кафкой? - PullRequest
2 голосов
/ 27 июня 2019

В настоящее время я реализовал приемник kafka, который работает следующим образом:

Внутри цикла while:

  1. Использование сообщения от kafka
  2. Поместить использованное сообщение в отдельный файл.задача для обработки, чтобы основной поток и цикл потребителя не блокировались. 2.1. Передать сообщение только в случае успешной обработки или превышения числа попыток обработки.

Шаг # 2.1 может занять от 1 секундадо 6 часов до завершения .

Проблема заключается в том, что в случае сбоя приложения и наличия задач, которые не были выполнены, при перезапуске приложения (или даже при перебалансировке) эти сообщения будут использоваться и обрабатываться снова.

Я не хочу автоматически фиксировать смещения, потому что это гарантирует доставку только один раз.Я думал об использовании базы данных в качестве хранилища для состояний сообщений и реализации потребителя следующим образом:

Внутри цикла while:

  1. Использование сообщения от kafka
  2. Проверка БД, если такое сообщение существует

    • Если сообщение существует в БД и состояние «завершено», то зафиксировать сообщение

    • Если сообщение существуетв дБ, но состояние «в процессе», затем перейдите к шагу № 4 напрямую

    • Если сообщение не существует, то перейдите к шагу # 3
  3. Сохранить сообщение в базе данных с состоянием «в процессе»

  4. Поместить использованное сообщение в отдельную задачу для обработки, чтобы основной поток и цикл потребителя не блокировались. 4.1. Передать сообщение и изменить состояние вдБ до «выполнено» только при успешной обработке или превышении числа попыток обработки.

Я не уверен, что использование дБ - правильный подход, потому что, если у меня много сообщений, он замедлитсявниз потребителя.Можете ли вы дать мне какие-либо предложения о том, как правильно реализовать потребителя, чтобы каждое сообщение обрабатывалось только один раз?

1 Ответ

2 голосов
/ 27 июня 2019

Ваш потребитель должен взять задачу из потока (Kafka), чтобы поток больше не содержал эту задачу. Если ваш рабочий узел дает сбой во время выполнения задачи, вам необходимо реализовать избыточность / обработку ошибок, то есть глобальную обработку исключений и постоянное временное хранилище. Таким образом, я бы не рекомендовал хранить задачи в базе данных вдоль потока, однако если вы собираетесь это сделать, то с таким же успехом вы можете создать таблицу в Kafka, поскольку они постоянны.

При обработке ошибок стратегия реализации зависит от вас, так как есть несколько способов, которыми вы могли бы с этим справиться, то есть, если узел аварийно завершает работу, тогда готово задание обратно в поток, готовое для захвата другим узлом, или вы можете просто зарегистрируйте задачу и оповестите пользователя о сбое задачи.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...