Политика Rebus Retry - PullRequest
       30

Политика Rebus Retry

0 голосов
/ 16 мая 2019

У меня есть вопросы по политике повторных попыток Rebus ниже:

Configure.With(...)
    .Options(b => b.SimpleRetryStrategy(maxDeliveryAttempts: 2))
    .(...)

https://github.com/rebus-org/Rebus/wiki/Automatic-retries-and-error-handling#customizing-retries

1 Можно ли его использовать как для Publiser (сообщения в очереди), так и для подписчика (сообщения очереди)?

2 У меня есть подписчик, который не может удалить сообщение из очереди. Таким образом сообщение отправляется в очередь ошибок.

Ниже приведена ошибка при помещении сообщения в очередь ошибок. Но я не вижу записи для повторной попытки.

[ERR] Rebus.Retry.PoisonQueues.PoisonQueueErrorHandler (Thread #9): Moving messa
ge with ID "<unknown>" to error queue "poison"
Rebus.Exceptions.RebusApplicationException: Received message with empty or absen
t 'rbs2-msg-id' header! All messages must be supplied with an ID . If no ID is p
resent, the message cannot be tracked between delivery attempts, and other stuff
 would also be much harder to do - therefore, it is a requirement that messages
be supplied with an ID.

Можно ли определять и хранить пользовательские журналы для каждой попытки, не в IErrorHandler?

3 Сколько времени между повторными попытками ожидания по умолчанию?

4 Можно ли определить настраиваемое время ожидания для каждой попытки (не в IErrorHandler)? Если да, то поддерживается ли Полли для этого сканарио? как показано ниже:

Random jitterer = new Random(); 
Policy
  .Handle<HttpResponseException>() // etc
  .WaitAndRetry(5,    // exponential back-off plus some jitter
      retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))  
                    + TimeSpan.FromMilliseconds(jitterer.Next(0, 100)) 
  );

https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/implement-resilient-applications/implement-http-call-retries-exponential-backoff-polly

Обновление

Как проверить политику повторных попыток?

Ниже приведено то, что я пробовал на основе приведенного ниже кода:

public class StringMessageHandler : IHandleMessages<String>
{   
    public async Task Handle(String message) 
    {
          //retry logic with Polly here
    }   
}

Я отправил недопустимое сообщение строкового типа в строковую тему, однако Handle (String message) вообще не вызывается.

1 Ответ

1 голос
/ 16 мая 2019

Механизм повтора Rebus важен только при получении сообщений.Он работает путем создания «транзакции очереди» (*), а затем, если обработка сообщения завершается неудачно, сообщение возвращается в очередь.

Практически сразу после этого сообщение будет снова получено и попытанообрабатываются.Это означает, что между попытками доставки нет задержки.

Для каждой неудачной доставки счетчик увеличивается для идентификатора этого сообщения.Вот почему идентификатор сообщения необходим для работы Rebus, что также объясняет, почему ваше сообщение без идентификатора немедленно перемещается в очередь недоставленных сообщений.

Из-за разрозненного характера попыток доставки (толькосчетчик для каждого идентификатора сообщения хранится), нет лучшего места для подключения библиотеки повторов, например, Polly.

Если вы хотите, чтобы Pollize обрабатывал ваши сообщения, я предлагаю вам выполнять отдельные операции с политиками Polly - таким образом,у вас могут быть разные политики для обработки сбойных веб-запросов, сбоя передачи файлов на сетевых дисках и т. д. Я сам это часто делаю.

Чтобы избежать невозможности правильно отключить экземпляр вашей шины, если он находится вВ процессе очень длительной повторной попытки Полли, вы можете передать внутреннюю CancellationToken от Rebus в свои исполнения Полли следующим образом:

public class PollyMessageHandler : IHandleMessages<SomeMessage>
{
    static readonly IAsyncPolicy RetryPolicy = Policy
        .Handle<Exception>()
        .WaitAndRetryForeverAsync(_ => TimeSpan.FromSeconds(10));

    readonly IMessageContext _messageContext;

    public PollyMessageHandler(IMessageContext messageContext)
    {
        _messageContext = messageContext;
    }

    public async Task Handle(SomeMessage message) 
    {
        var cancellationToken = _messageContext.GetCancellationToken();

        await RetryPolicy.ExecuteAsync(DoStuffThatCanFail, cancellationToken);
    }

    async Task DoStuffThatCanFail(CancellationToken cancellationToken)
    {
        // do your risky stuff in here
    }
}

(*) Фактический тип транзакции зависит от того, что поддерживаетсяна транспорте.

В MSMQ это объект MessageQueueTransaction, в котором есть методы Commit() и Rollback().

С RabbitMQ, служебной шиной Azure и другими это протокол на основе арендыгде сообщение становится невидимым в течение некоторого времени, а затем, если сообщение получает ACK в течение этого времени, то сообщение удаляется.В противном случае - если сообщение или NACKed, или если срок аренды истекает - сообщение снова всплывает и может быть снова получено другими потребителями.

С транспортом SQL это просто транзакция базы данных.

...