Как сохранить сообщение в базе данных и отправить ответ в тему в конечном итоге согласованным? - PullRequest
5 голосов
/ 03 июля 2019

У меня есть следующий потребитель rabbitMq:

Consumer consumer = new DefaultConsumer(channel) {
    @Override
     public void handleDelivery(String consumerTag, Envelope envelope, MQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            sendNotificationIntoTopic(message);
            saveIntoDatabase(message);
     }
};

Может возникнуть следующая ситуация:

  1. Сообщение было успешно отправлено в тему
  2. Соединение с базой данных потеряно, поэтому вставка базы данных не удалась.

В результате мы имеем несоответствие данных.

Ожидаемый результат: либо оба действия были успешно выполнены, либо оба не были выполнены вообще.

Какие-нибудь решения, как я могу достигнуть этого?

приписка

В настоящее время у меня есть следующая идея (пожалуйста, прокомментируйте)

Можно предположить, что брокер не потерял ни одного сообщения.

Мы должны подписаться на тему, которую хотим отправить.

  1. Сохранить запись в базе данных и установить поле status со значением 'pending'
  2. Попытка отправить данные в тему. Если отправка прошла успешно - обновите поле status со значением 'success'
  3. У нас должна быть запланированная работа, которая должна проверять строки с ожидающим статусом. На данный момент возможны 2 случая:
    3.1 Уведомление не было отправлено вообще
    3.2 Уведомление было отправлено, но сохранить в базе данных не удалось (вероятность очень низкая, но это возможно)

    Таким образом, мы должны как-то различать два случая: мы можем хранить сообщения из темы в коллекции, а задание может проверять, принято сообщение или нет. Поэтому, если задание обнаружило сообщение, соответствующее строке базы данных, мы должны обновить статус до «успех». В противном случае мы должны удалить запись из базы данных.

Я думаю, что моя идея имеет некоторые недостатки (например, если у нас есть многоузловое приложение, мы должны хранить сообщения в Hazelcast (или аналогах), но это дополнительная точка гипотетического сбоя)

Ответы [ 4 ]

2 голосов
/ 12 июля 2019

Вот пример Попробуйте отменить Подтвердите шаблон https://servicecomb.apache.org/docs/distributed_saga_3/, который должен быть способен решить вашу проблему. Вы должны допустить некоторую вероятность двойной передачи данных через очередь. Вот пример:

  1. Определение операции абстракции и присвоение идентификатора операции плюс отметка времени.
  2. Состояние записи Ожидает в базу данных (вы можете сделать это так же, как 1)
  3. Напишите прослушиватель, который опрашивает базу данных для всех операций с ожидающим статусом и старше «тайм-аута»
  4. Для каждой ожидающей операции отправляйте данные через очередь с назначенным идентификатором.
  5. Сторона получателя должна знать идентификатор, и если идентификатор был обработан, ничего не произойдет.

6A . Если вам нужно на 100% завершить операцию, вам нужна вторая очередь, где сторона получателя отправит сообщение с идентификатором - ВЫПОЛНЕНО. Если такая согласованность не требуется, пропустите этот шаг. В качестве альтернативы он может опубликовать ID -Failed причину сбоя.

. Отправляющая сторона либо ожидает сообщения от 6А, либо завершает операцию, записывая в базу данных статус ГОТОВО.

  • Как только истечет таймаут сертина или истечет определенный лимит повторных попыток. Вы пишете статус операции FAIL.
  • Вы можете отправить сообщение оператору на стороне получателя с откатом идентификатора.

Обратите внимание, что все эти шаги не связаны с техническими транзакциями. Вы можете сделать это с нетранзакционной базой данных.

То, что я написал, представляет собой вариант шаблона подтверждения отмены попытки, в котором каждый получатель сообщения должен знать, как управлять своими собственными данными.

0 голосов
/ 10 июля 2019

Если есть достаточно времени для изменения дизайна, рекомендуется использовать JTA-подобные API-интерфейсы для управления 2-фазной фиксацией.Даже weblogic и WebSphere поддерживают ресурс XA для двухфазной фиксации.

Если временная шкала меньше, рекомендуется выполнить следующее, чтобы уменьшить разрыв при сбое.

  • Тема отправки данных (без фиксации) (если тема недоступна, повторите попытку с интервалом)
  • Запись данных в БД
  • Фиксация БД
  • Фиксация темы

Здесь сбой произойдет только в случае сбоя шага 4.Это приведет к повторной отправке того же сообщения.Таким образом, принимающая система получит дубликат сообщения.Каждое сообщение имеет уникальный messageID и CorrelationID в структуре JMS2.0.Таким образом, поиск дубликатов довольно прост (но это должно быть выполнено в принимающей системе)

Оба случая будут работать и для кластерной среды.


Строго в вашем случае, следующие шаги могут помочь решить вашу проблему

Подпишите слушателя-слушателя-1 на вашу тему.

Process-1

  • Добавить запись БД со статусом «будет отправлено» для сообщения msg-1
  • Отправить сообщение msg-1 в тему.Повторите попытку отправки в случае сбоя какой-либо темы. Если шаг 2 завершился неудачно после определенной повторной попытки, процесс-1 должен повторно отправить msg-1 перед отправкой любых новых сообщений ИЛИ шаг-1 для отката

Listener-1

  • Используя подписанного слушателя, прочитайте ссылку (meesageID / correlationID) из темы, обновите статус БД до SENT и прочитайте / удалите сообщение из темы.Включите успешное чтение ссылки и обновление БД, в теме все еще есть сообщение.Так что при следующем прочтении обновится БД.Incase DB обновление успешно и удаление сообщения не удалось.Слушатель будет читать снова и пытается обновить сообщение, которое уже сделано.Таким образом, может быть проигнорировано после проверки.

Включите самого слушателя, тема будет иметь сообщения, пока слушатель не прочитает сообщения.До этого отправленные сообщения будут иметь статус «для отправки».

0 голосов
/ 12 июля 2019

Вот псевдокод того, как я это сделаю: (Предполагая, что слой дао имеет транзакционные возможности, а уровень обмена сообщениями - нет)

    //Start a transaction
    try {
                String message = new String(body, "UTF-8");
               // Ordering is important here as I'm assuming the database has commit and rollback capabilities, but the messaging system doesnt. 
                saveIntoDatabase(message);
                sendNotificationIntoTopic(message);

    } catch (MessageDeliveryException) {
        // rollback the transaction
        // Throw a domain specific exception
    }
   //commit the transaction

Сценарии
1. Если происходит сбой базы данных, сообщение не будет отправлено, поскольку исключение нарушит поток кода.
2. Если вызов базы данных выполнен успешно, а система обмена сообщениями не доставляет сообщение, перехватите исключение и откат изменений базы данных

Все действия, необходимые для регистрации и воспроизведения сбоев, могут быть за пределами этого метода

0 голосов
/ 03 июля 2019
  1. В строке сохранения базы данных слушателя с полем staus = 'pending'
  2. Другое задание (отдельный поток) получит все ожидающие строки из БД и последующие для каждой строки:
    2.1 отправить данные в тему
    2.2 сохранить в базу данных

Если мы потерпели неудачу на шаге 1 - все в порядке - данные в согласованном состоянии, потому что задание не будет ничего знать об этих данных

если мы потерпели неудачу на шаге 2.1 - нет проблем, следующий вызов задания попытается обработать его

если мы потерпели неудачу на шаге 2.2 - Если мы потерпели неудачу здесь - это означает, что при следующем вызове задания снова будут обрабатываться те же данные. С первого взгляда можно подумать, что это проблема. Но ваш потребитель должен быть идемпотентом - это означает, что он должен понимать, что сообщение уже обработано, и пропустить обработку. Это требование является следствием того, что все брокеры сообщений имеют гарантии, что сообщение будет доставлено как минимум ОДИН РАЗ. Таким образом, наши потребители должны быть готовы к дублированию сообщений в любом случае. Нет проблем снова.

...