Механизм повтора Rebus важен только при получении сообщений.Он работает путем создания «транзакции очереди» (*), а затем, если обработка сообщения завершается неудачно, сообщение возвращается в очередь.
Практически сразу после этого сообщение будет снова получено и попытанообрабатываются.Это означает, что между попытками доставки нет задержки.
Для каждой неудачной доставки счетчик увеличивается для идентификатора этого сообщения.Вот почему идентификатор сообщения необходим для работы Rebus, что также объясняет, почему ваше сообщение без идентификатора немедленно перемещается в очередь недоставленных сообщений.
Из-за разрозненного характера попыток доставки (толькосчетчик для каждого идентификатора сообщения хранится), нет лучшего места для подключения библиотеки повторов, например, Polly.
Если вы хотите, чтобы Pollize обрабатывал ваши сообщения, я предлагаю вам выполнять отдельные операции с политиками Polly - таким образом,у вас могут быть разные политики для обработки сбойных веб-запросов, сбоя передачи файлов на сетевых дисках и т. д. Я сам это часто делаю.
Чтобы избежать невозможности правильно отключить экземпляр вашей шины, если он находится вВ процессе очень длительной повторной попытки Полли, вы можете передать внутреннюю CancellationToken
от Rebus в свои исполнения Полли следующим образом:
public class PollyMessageHandler : IHandleMessages<SomeMessage>
{
static readonly IAsyncPolicy RetryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryForeverAsync(_ => TimeSpan.FromSeconds(10));
readonly IMessageContext _messageContext;
public PollyMessageHandler(IMessageContext messageContext)
{
_messageContext = messageContext;
}
public async Task Handle(SomeMessage message)
{
var cancellationToken = _messageContext.GetCancellationToken();
await RetryPolicy.ExecuteAsync(DoStuffThatCanFail, cancellationToken);
}
async Task DoStuffThatCanFail(CancellationToken cancellationToken)
{
// do your risky stuff in here
}
}
(*) Фактический тип транзакции зависит от того, что поддерживаетсяна транспорте.
В MSMQ это объект MessageQueueTransaction
, в котором есть методы Commit()
и Rollback()
.
С RabbitMQ, служебной шиной Azure и другими это протокол на основе арендыгде сообщение становится невидимым в течение некоторого времени, а затем, если сообщение получает ACK в течение этого времени, то сообщение удаляется.В противном случае - если сообщение или NACKed, или если срок аренды истекает - сообщение снова всплывает и может быть снова получено другими потребителями.
С транспортом SQL это просто транзакция базы данных.