Шаблон для отправки сообщений в темы при изменении состояния сущности - PullRequest
0 голосов
/ 08 мая 2020

Мы используем Azure Service Bus для уведомления подписчиков, когда определенный объект в нашем приложении перешел в определенное состояние. Прямо сейчас мы делаем это сразу после вызова dbContext.SaveChangesAsyn c ():

  1. dbContext.SaveChangesAsync()

  2. topicClient.SendAsync(someMessage)

Проблема, с которой я столкнулся, заключается в следующем: скажем, dbContext.SaveChangesAsyn c () проходит нормально, но по какой-то причине вызов topicClient.SendAsyn c () выдает исключение. Теперь подписчики на этот topi c не будут знать об изменении состояния объекта.

Я пробовал использовать TransactionScope, но это не сработало, потому что, как я понял, Azure не используйте DT C.

(я мог бы изменить порядок двух вышеуказанных шагов, но тогда, если сообщение отправляется нормально, а сохранение не удается, сообщение содержит поддельные данные.)

Есть ли у кого-нибудь предложения по решению этой проблемы? Кажется, это должно быть обычное дело, но я не могу найти ничего в Интернете. Если бы кто-то мог указать мне правильное направление, я был бы признателен.

Заранее спасибо.

Ответы [ 3 ]

1 голос
/ 10 мая 2020

Как вы уже отметили, порядок операций не имеет значения, потому что нет перекрывающихся транзакций между двумя службами, базой данных и службой обмена сообщениями. Если хранилище данных, которое вы используете, поддерживает транзакции (например, Azure SQL сервер), вы можете обойтись без двухфазной фиксации и изучить возможность реализации шаблона Outbox .

NServiceBus предоставляет шаблон как элемент . Вы можете загрузить исходящий образец , который показывает, как его использовать с RabbitMQ. Транспортное средство можно заменить на Azure Транспортная шина обслуживания , чтобы удовлетворить ваши требования.

Раскрытие информации: Я участвую в NServiceBus.

0 голосов
/ 08 мая 2020

Это не прямой ответ на проблему, с которой вы столкнулись, но я расскажу, как мы решили аналогичную проблему в одном из наших приложений, где нам приходилось записывать данные в отдельные таблицы в Azure Table Storage. Поскольку мы записывали данные в отдельные таблицы, мы не могли использовать функциональность Entity Batch Transaction, доступную в Azure Table Storage.

Мы решили эту проблему, реализовав что-то похожее на eventual consistency pattern.

Что мы делаем, так это вместо того, чтобы сохранять данные непосредственно в таблицах (что означало бы выполнение нескольких сетевых запросов, любой из которых может завершиться ошибкой), мы отправляем данные, которые необходимо сохранить, в очередь (мы использовали очередь хранения) . Если мы сможем сохранить данные в очереди, это означает, что данные в конечном итоге будут доступны.

Затем мы написали функцию Azure Queue Triggered Function. В этой функции мы сохраняем данные в таблицах, которые нам нужно сохранить. После успешного выполнения всех операций сообщение автоматически удаляется средой выполнения функции. В случае сбоя какой-либо операции сообщение будет снова отправлено в очередь и снова будет удалено из очереди.

Теперь важно понять, что эти методы сохранения должны быть идемпотентными. Допустим, мы записываем в 3 таблицы и операция записи для 1-й таблицы завершается успешно, но операция записи для 2-й таблицы завершается неудачно. В следующий раз, когда функция будет вызвана, она снова попытается выполнить запись в 1-ю таблицу, и код должен уметь правильно с этим справиться.

0 голосов
/ 08 мая 2020

Вы можете реализовать шаблон под названием очередь подозрительных сообщений или очередь недоставленных сообщений . Есть и другие известные синонимы, такие как retry queue.

Идея состоит в том, чтобы попробовать операцию, и если она не удалась, поместить некоторую метаинформацию в очередь и повторить попытку позже. В вашем случае после вызова dbContext.SaveChangesAsync() вы можете поместить всю необходимую информацию в очередь durable и иметь обработчик, который будет обрабатывать эту очередь, и в каком-то виде ProcessMessage() вы можете обрабатывать вызовы topicClient.SendAsync(someMessage).
Например, если вызов служебной шины завершился неудачно, вы можете вернуть элемент очереди обратно в очередь для последующей обработки.

Конечно, вы можете выделить указанную очередь только для неудачных вызовов topicClient.SendAsync(someMessage), что позволяет значительно уменьшить его размер. Порядок выполнения операций не имеет значения, так как вы можете поместить любую метаинформацию, которая позволяет сначала позвонить topicClient.SendAsync(someMessage), а затем попробовать обновить базу данных.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...