Если сообщение не может быть обработано в функции Azure ServiceBusTrigger, как я могу отложить обработку этого сообщения на x минут? - PullRequest
0 голосов
/ 05 сентября 2018

У меня есть функция Azure, которая читает тему ServiceBus и вызывает стороннюю службу. Если служба не работает, я хотел бы подождать 5 минут, прежде чем повторить попытку вызова с тем же сообщением. Как я могу добавить задержку, чтобы функция Azure не оставляла сообщение и не сразу возвращала его обратно?

public static void Run([ServiceBusTrigger("someTopic", 
     "someSubscription", AccessRights.Manage, Connection = 
     "ServiceBusConnection")] BrokeredMessage message) 
{
     CallService(bodyOfBrokeredMessage); //service is down

     //How do I add a delay so the message won't be reprocessed immediately thus quickly exhausting it's max delivery count?
}

Ответы [ 2 ]

0 голосов
/ 05 сентября 2018

Как сказал Джош, вы можете просто клонировать исходное сообщение, настроить запланированное время постановки в очередь, отправить клон и завершить оригинал.

Жаль, что отправка клона и завершение оригинала не является атомарной операцией, поэтому очень маловероятно, что мы увидим оригинал снова в случае сбоя процесса обработки в самый неподходящий момент.

И другая проблема заключается в том, что DeliveryCount на клоне будет всегда равным 1 , потому что это совершенно новое сообщение. Таким образом, мы могли бы бесконечно повторять и никогда не зацикливаться на этом сообщении.

К счастью, это можно исправить, добавив наш счетчик повторных отправок в качестве свойства сообщения:

[FunctionName("DelayMessage")]
public static async Task DelayMessage([ServiceBusTrigger("MyQueue", AccessRights.Listen, Connection = "MyConnection")]BrokeredMessage originalMessage,
            [ServiceBus("MyQueue", AccessRights.Send, Connection = "MyConnection")]IAsyncCollector<BrokeredMessage> newMessages,TraceWriter log)
{
     //handle any kind of error scenerio
     int resubmitCount = originalMessage.Properties.ContainsKey("ResubmitCount") ?  (int)originalMessage.Properties["ResubmitCount"] : 0;
     if (resubmitCount > 5)
     {
         Console.WriteLine("DEAD-LETTERING");
         originalMessage.DeadLetter("Too many retries", $"ResubmitCount is {resubmitCount}");
     }
     else
     {
         var newMessage = originalMessage.Clone();
         newMessage.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddMinutes(5);
         await newMessages.AddAsync(newMessage);
     }
}

Для более подробной информации, вы можете обратиться к этой статье .

Кроме того, довольно легко реализовать следующий шаблон ожидания / повтора / удаления очереди в LogicApp, так как этот тип управления потоком - это именно то, для чего были разработаны LogicApps. Пожалуйста, обратитесь к этому SO поток .

0 голосов
/ 05 сентября 2018

Один из вариантов - создать новое сообщение и отправить его в очередь, но в будущем установите ScheduledEnqueueTimeUtc на пять минут.

        [FunctionName("DelayMessage")]
        public static async Task DelayMessage(
            [ServiceBusTrigger("MyQueue", AccessRights.Listen, Connection = "MyConnection")]BrokeredMessage originalMessage,
            [ServiceBus("MyQueue", AccessRights.Send, Connection = "MyConnection")]IAsyncCollector<BrokeredMessage> newMessages,
            TraceWriter log)
        {
            //handle any kind of error scenerio

            var newMessage = originalMessage.Clone();

            newMessage.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddMinutes(5);

            await newMessages.AddAsync(newMessage);

        }
...