У меня есть шаблон Durable Function Fan Out и In, который не работает надежно. Оркестровка вызывается из функции Timer каждые 10 минут, но с тех пор она увеличилась до 20. Функция Activity вызывается с помощью context.CallActivityAsyn c и возвращает целое число (количество обработанных строк). В настоящее время переданные рабочие элементы должны содержать только 2 элемента для обработки. Первый элемент обрабатывает все строки и показывает полный в журнале. Второй элемент иногда показывает обрабатываемые строки, но в какой-то момент он просто останавливается ... Никакая другая активность не распознается, и «завершение» никогда не отображается в журналах. Также во втором упражнении иногда показано, что он запускается несколько раз одновременно ... Я попробовал этот точный код на моей машине разработчика, используя те же данные, и он обрабатывается до завершения, занимая не более 5 минут. Я также установил файл hosts. json на
{
"version": "2.0",
"functionTimeout": "00:10:00",
"extensions": {
"queues": {
"maxPollingInterval": "00:00:05"
}
}
}
Оркестровка:
public static async void RunOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
log.LogInformation($"************** Fanning out ********************");
var parallelTasks = new List<Task<int>>();
//object[] workBatch = await context.CallActivityAsync<object[]>("GETVendors", null);
object[] workBatch = GETVendors(log); //work batch only has 2 items
for (int i = 0; i<workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("SynchVendor", workBatch[i]);
parallelTasks.Add(task);
}
log.LogInformation($"************** 'Waiting' for parallel results ********************");
await Task.WhenAll(parallelTasks);
log.LogInformation($"************** All activity functions complete ********************");
log.LogInformation($"************** fanning in ********************");
int cnt = 0;
foreach (var completedParallelActivity in parallelTasks)
{
cnt += completedParallelActivity.Result;
}
log.LogInformation($"Total Records Converted across all tasks = {cnt}");
//return outputs;
}
Функция активности
public static async Task<int> SynchVendor([ActivityTrigger] string vendor, ILogger log)
{
log.LogInformation($"SynchVendor {vendor}");
string sqlStr = Environment.GetEnvironmentVariable("Sqldb_Connection");
bool dev = Convert.ToBoolean(Environment.GetEnvironmentVariable("Dev"));
int totalCount = 0;
using (SqlConnection conn = new SqlConnection(sqlStr))
{
conn.Open();
// lets synch the vendor
Int32 limit = 200;
bool initialLoad = false;
int offset = 0;
bool done = false;
do
{
//synch logic...
// if there are rows found to have changed then send them to a queue for further processing
} while (!done);
// we are done syncing a vendor write out the vendorinfo
conn.Close();
}
log.LogInformation($"SynchVendor {vendor} Complete");
return totalCount;