У меня есть функция Azure Durable с QueueTrigger.А внутри у меня есть несколько функций Azure, которые вызываются вместе с суб-оркестровкой.Вот код
var processKatartItem = context.GetInput<KatartItem>();
if (!processKatartItem.ProcessLeadTime && !processKatartItem.ProcessPrice && !processKatartItem.ProcessStock)
{
await context.CallActivityWithRetryAsync("LLGFeedPreProductMessageProcessor", new RetryOptions(TimeSpan.FromSeconds(1), 5), processKatartItem);
}
var tasks = new Task[processKatartItem.vardatas.Count];
for (var i = 0; i < processKatartItem.vardatas.Count; i++)
{
tasks[i] = context.CallSubOrchestratorWithRetryAsync("LLGFeedProductOrchestrator", new RetryOptions(TimeSpan.FromSeconds(1), 5), new ProductProcessingModel
{
ProcessKatartItem = processKatartItem,
ProcessVardata = processKatartItem.vardatas[i]
});
}
await Task.WhenAll(tasks);
await context.CallActivityWithRetryAsync("LLGFeedFinalizeMessage", new RetryOptions(TimeSpan.FromSeconds(1), 5), processKatartItem);
А вот код для SubOrchestrator
var processModel = context.GetInput<ProductProcessingModel>();
await context.CallActivityWithRetryAsync("LLGFeedProductSpecificationProcessor", new RetryOptions(TimeSpan.FromSeconds(1), 5), processModel);
await context.CallActivityWithRetryAsync("LLGFeedProductMessageProcessor", new RetryOptions(TimeSpan.FromSeconds(1), 5), processModel);
await context.CallActivityWithRetryAsync("LLGFeedProductMappingProcessor", new RetryOptions(TimeSpan.FromSeconds(1), 5), processModel);
await context.CallActivityWithRetryAsync("LLGFeedProductPriceProcessor", new RetryOptions(TimeSpan.FromSeconds(1), 5), processModel);
Как это структурировано, мы обрабатываем сообщение и его входные данные в базу данных, а затем запускаем MessageFinalization, которыйиндексирует сообщение / продукт в Elastic Search, вызывая API на нашем веб-сайте.Мы помещаем в очередь большое количество сообщений и затем обрабатываем их.
Проблема, с которой я столкнулся, заключается в том, что Durable Function выполняет первые несколько функций Azure, а затем ожидает последние несколько.Вот скриншот с панели инструментов WebJobs
ТАК, как вы можете видеть, что было обработано много сообщений, но не был выполнен 1 экземпляр для LLGFeedFinalizeMessage,Есть ли способ обеспечить выполнение всего конвейера вместо первых нескольких функций вначале, а всех функций - в конце позже.Или, может быть, даже FIFO.Это значит, что оркестр должен подождать и завершить все свои функции, прежде чем будет запущен новый экземпляр.
Host.json
{
"version": "2.0",
"extensions": {
"durableTask": {
"HubName": "LLGFeedTaskHub",
"ControlQueueBatchSize": 8,
"PartitionCount": 2,
"MaxConcurrentActivityFunctions": 10,
"MaxConcurrentOrchestratorFunctions": 2,
"AzureStorageConnectionStringName": "AzureWebJobsStorage"
},
"queues": {
"batchSize": 10,
"newBatchThreshold": 5
}
},
"functionTimeout": "00:10:00"
}