Я разработал приложение Azure Durable Functions, которое запускает новые сообщения очереди служебной шины.Он работает нормально, когда ошибок не возникает, но когда происходит ошибка в функции активности, он регистрирует, что произошел сбой, но сообщение навсегда ушло из очереди.Что может быть причиной, и как я могу предотвратить исчезновение сообщения из очереди при ошибке?
Вот воспроизводимый код, это код, сгенерированный из нового шаблона функции Azure в VS2017, только исключениедобавляется, когда городом является «Сиэтл», и вместо HttpTrigger это ServicebusTrigger.
[FunctionName("Test")]
public static async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context)
{
var outputs = new List<string>();
// Replace "hello" with the name of your Durable Activity Function.
outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "Tokyo"));
outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "Seattle"));
outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "London"));
// returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
return outputs;
}
[FunctionName("Test_Hello")]
public static string SayHello([ActivityTrigger] string name, ILogger log)
{
log.LogInformation($"Saying hello to {name}.");
if (name == "Seattle")
throw new Exception("An error occurs");
return $"Hello {name}!";
}
[FunctionName("Test_HttpStart")]
public static async Task ServiceBusStart(
[ServiceBusTrigger("somequeue", Connection = "ServiceBusQueueListenerConnectionString")]string queuemsg,
[OrchestrationClient]DurableOrchestrationClient starter,
ILogger log)
{
// Function input comes from the request content.
var msg = JsonConvert.DeserializeObject<IncomingMessage>(queuemsg);
string instanceId = await starter.StartNewAsync("Test", msg);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
}
Обновление: когда у меня есть исключение в клиентской функции Orchestration, он делает правильные действия, такие как повторная попытка и установкасообщение в очереди недоставленных сообщений, если повторная попытка не удалась x раз.
Так что мне удалось обойти это, обновив клиентскую функцию с помощью этого цикла while, проверяя состояние «сбой / прекращено / отменено».
[FunctionName("Test_HttpStart")]
public static async Task ServiceBusStart(
[ServiceBusTrigger("somequeue", Connection = "ServiceBusQueueListenerConnectionString")]string queuemsg,
[OrchestrationClient]DurableOrchestrationClient starter,
ILogger log)
{
// Function input comes from the request content.
var msg = JsonConvert.DeserializeObject<IncomingMessage>(queuemsg);
string instanceId = await starter.StartNewAsync("Test", msg);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
var status = await starter.GetStatusAsync(instanceId);
while (status.RuntimeStatus != OrchestrationRuntimeStatus.Completed)
{
System.Threading.Thread.Sleep(1000);
status = await starter.GetStatusAsync(instanceId);
if (status.RuntimeStatus == OrchestrationRuntimeStatus.Failed
|| status.RuntimeStatus == OrchestrationRuntimeStatus.Terminated
|| status.RuntimeStatus == OrchestrationRuntimeStatus.Canceled)
{
throw new Exception("Orchestration failed with error: " + status.Output);
}
}
}
Однако мне это кажется взломом, и я не видел такого типа кода ни в одном примере кода MS.Я полагаю, что об этом должна заботиться структура надежных функций.Есть ли другой способ заставить триггер служебной шины работать в надежных функциях?