Сообщение из очереди сервисной шины исчезает при ошибке в функции активности - PullRequest
0 голосов
/ 31 декабря 2018

Я разработал приложение Azure Durable Functions, которое запускает новые сообщения очереди служебной шины.Он работает нормально, когда ошибок не возникает, но когда происходит ошибка в функции активности, он регистрирует, что произошел сбой, но сообщение навсегда ушло из очереди.Что может быть причиной, и как я могу предотвратить исчезновение сообщения из очереди при ошибке?

Вот воспроизводимый код, это код, сгенерированный из нового шаблона функции Azure в VS2017, только исключениедобавляется, когда городом является «Сиэтл», и вместо HttpTrigger это ServicebusTrigger.

            [FunctionName("Test")]
    public static async Task<List<string>> RunOrchestrator(
        [OrchestrationTrigger] DurableOrchestrationContext context)
    {
        var outputs = new List<string>();

        // Replace "hello" with the name of your Durable Activity Function.
        outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "Tokyo"));
        outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "Seattle"));
        outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "London"));

        // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
        return outputs;
    }

    [FunctionName("Test_Hello")]
    public static string SayHello([ActivityTrigger] string name, ILogger log)
    {
        log.LogInformation($"Saying hello to {name}.");
        if (name == "Seattle")
            throw new Exception("An error occurs");
        return $"Hello {name}!";
    }

    [FunctionName("Test_HttpStart")]
    public static async Task ServiceBusStart(
        [ServiceBusTrigger("somequeue", Connection = "ServiceBusQueueListenerConnectionString")]string queuemsg,
        [OrchestrationClient]DurableOrchestrationClient starter,
        ILogger log)
    {
        // Function input comes from the request content.
        var msg = JsonConvert.DeserializeObject<IncomingMessage>(queuemsg);
        string instanceId = await starter.StartNewAsync("Test", msg);
        log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
    }

Обновление: когда у меня есть исключение в клиентской функции Orchestration, он делает правильные действия, такие как повторная попытка и установкасообщение в очереди недоставленных сообщений, если повторная попытка не удалась x раз.

Так что мне удалось обойти это, обновив клиентскую функцию с помощью этого цикла while, проверяя состояние «сбой / прекращено / отменено».

    [FunctionName("Test_HttpStart")]
    public static async Task ServiceBusStart(
        [ServiceBusTrigger("somequeue", Connection = "ServiceBusQueueListenerConnectionString")]string queuemsg,
        [OrchestrationClient]DurableOrchestrationClient starter,
        ILogger log)
    {
        // Function input comes from the request content.
        var msg = JsonConvert.DeserializeObject<IncomingMessage>(queuemsg);
        string instanceId = await starter.StartNewAsync("Test", msg);
        log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

        var status = await starter.GetStatusAsync(instanceId);

        while (status.RuntimeStatus != OrchestrationRuntimeStatus.Completed)
        {
            System.Threading.Thread.Sleep(1000);
            status = await starter.GetStatusAsync(instanceId);
            if (status.RuntimeStatus == OrchestrationRuntimeStatus.Failed 
                || status.RuntimeStatus == OrchestrationRuntimeStatus.Terminated
                || status.RuntimeStatus == OrchestrationRuntimeStatus.Canceled)
            {
                throw new Exception("Orchestration failed with error: " + status.Output);
            }
        }

    }

Однако мне это кажется взломом, и я не видел такого типа кода ни в одном примере кода MS.Я полагаю, что об этом должна заботиться структура надежных функций.Есть ли другой способ заставить триггер служебной шины работать в надежных функциях?

1 Ответ

0 голосов
/ 06 января 2019

Такое поведение является второстепенным.Запуск оркестровки происходит асинхронно - т. Е. API StartNewAsync не будет автоматически ожидать запуска или завершения оркестровки.Внутренне StartNewAsync просто помещает сообщение в очередь хранилища Azure и записывает запись в таблицу хранилища Azure.Если это произойдет успешно, то ваша функция служебной шины продолжит работу и завершится успешно, и в этот момент сообщение будет удалено.

Ваш обходной путь приемлем, если вам действительно нужно сообщение очереди служебной шины, чтобы повторить попытку, но яВопрос, почему вам нужно это сделать.Сама оркестровка может самостоятельно управлять повторными попытками, не полагаясь на служебную шину.Например, вы можете использовать CallActivityWithRetryAsync для внутренних попыток внутри оркестровки.

См. Раздел Обработка ошибок в документации по долговременным функциям.

...