Долгосрочная функция Azure: при объединении нескольких функций новые экземпляры запускаются до того, как какое-либо сообщение может закончить обработку - PullRequest
0 голосов
/ 01 марта 2019

У меня есть функция Azure Durable с QueueTrigger.А внутри у меня есть несколько функций Azure, которые вызываются вместе с суб-оркестровкой.Вот код

 var processKatartItem = context.GetInput<KatartItem>();

if (!processKatartItem.ProcessLeadTime && !processKatartItem.ProcessPrice && !processKatartItem.ProcessStock)
{
    await context.CallActivityWithRetryAsync("LLGFeedPreProductMessageProcessor", new RetryOptions(TimeSpan.FromSeconds(1), 5), processKatartItem);
}

var tasks = new Task[processKatartItem.vardatas.Count];

for (var i = 0; i < processKatartItem.vardatas.Count; i++)
{
    tasks[i] = context.CallSubOrchestratorWithRetryAsync("LLGFeedProductOrchestrator", new RetryOptions(TimeSpan.FromSeconds(1), 5), new ProductProcessingModel
    {
        ProcessKatartItem = processKatartItem,
        ProcessVardata = processKatartItem.vardatas[i]
    });
}

await Task.WhenAll(tasks);
await context.CallActivityWithRetryAsync("LLGFeedFinalizeMessage", new RetryOptions(TimeSpan.FromSeconds(1), 5), processKatartItem);

А вот код для SubOrchestrator

var processModel = context.GetInput<ProductProcessingModel>();

await context.CallActivityWithRetryAsync("LLGFeedProductSpecificationProcessor", new RetryOptions(TimeSpan.FromSeconds(1), 5), processModel);
await context.CallActivityWithRetryAsync("LLGFeedProductMessageProcessor", new RetryOptions(TimeSpan.FromSeconds(1), 5), processModel);
await context.CallActivityWithRetryAsync("LLGFeedProductMappingProcessor", new RetryOptions(TimeSpan.FromSeconds(1), 5), processModel);
await context.CallActivityWithRetryAsync("LLGFeedProductPriceProcessor", new RetryOptions(TimeSpan.FromSeconds(1), 5), processModel);

Как это структурировано, мы обрабатываем сообщение и его входные данные в базу данных, а затем запускаем MessageFinalization, которыйиндексирует сообщение / продукт в Elastic Search, вызывая API на нашем веб-сайте.Мы помещаем в очередь большое количество сообщений и затем обрабатываем их.

Проблема, с которой я столкнулся, заключается в том, что Durable Function выполняет первые несколько функций Azure, а затем ожидает последние несколько.Вот скриншот с панели инструментов WebJobs

enter image description here

ТАК, как вы можете видеть, что было обработано много сообщений, но не был выполнен 1 экземпляр для LLGFeedFinalizeMessage,Есть ли способ обеспечить выполнение всего конвейера вместо первых нескольких функций вначале, а всех функций - в конце позже.Или, может быть, даже FIFO.Это значит, что оркестр должен подождать и завершить все свои функции, прежде чем будет запущен новый экземпляр.

Host.json

{
    "version": "2.0",
    "extensions": {
      "durableTask": {
        "HubName": "LLGFeedTaskHub",
        "ControlQueueBatchSize": 8,
        "PartitionCount": 2,
        "MaxConcurrentActivityFunctions": 10,
        "MaxConcurrentOrchestratorFunctions": 2,
        "AzureStorageConnectionStringName": "AzureWebJobsStorage"
      },
      "queues": {
        "batchSize": 10,
        "newBatchThreshold": 5
      }
    },
    "functionTimeout": "00:10:00"
}
...