Только разветвление (и забыть) в длительных функциях - PullRequest
0 голосов
/ 17 мая 2018

У меня есть приложение-функция с двумя функциями и очередью хранения.F1 вызывается сообщением в теме служебной шины.Для каждого полученного сообщения F1 вычисляет некоторые подзадачи (T1, T2, ...), которые должны выполняться с различной задержкой.Ex - T1 срабатывает через 3 минуты, T2 - через 5 минут и т. Д. F1 отправляет сообщения в очередь хранения с соответствующими тайм-аутами видимости (для имитации задержки), и F2 запускается всякий раз, когда сообщение видно в очереди.Все работает отлично.

Теперь я хочу перенести это приложение в режим «Durable Functions».F1 теперь только запускает оркестратор.Код оркестратора выглядит следующим образом:

    public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
    {
        var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
        List<Task> tasks = new List<Task>();
        foreach (var value in results)
        {
            var pnTask = context.CallActivityAsync("PerformSubTask", value);
            tasks.Add(pnTask);
        }

        //dont't await as we want to fire and forget. No fan-in!
        //await Task.WhenAll(tasks);
    }

    [FunctionName("PerformSubTask")]
    public async static Task Run([ActivityTrigger]TaskInfo info, TraceWriter log)
    {
         TimeSpan timeDifference = DateTime.UtcNow - info.Origin.ToUniversalTime();
         TimeSpan delay = TimeSpan.FromSeconds(info.DelayInSeconds);
         var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;

         //will still keep the activity function running and incur costs??
         await Task.Delay(actualDelay);

         //perform subtask work after delay! 
    }

Я хотел бы только разойтись (без разветвления для сбора результатов) и запустить подзадачи.Оркестратор запускает все задачи и избегает вызова «await Task.WhenAll».Функция действия вызывает «Task.Delay», чтобы подождать указанное количество времени, а затем выполняет свою работу.

Мои вопросы

  • Имеет ли смысл использовать для этого Durable Functionsрабочий процесс?
  • Является ли это правильным подходом для организации рабочего процесса "Разветвления"?
  • Мне не нравится тот факт, что функция действия выполняется в течение указанного периода времени (3 или 5 минут).) ничего не делать.Это будет сопряжено с расходами, или?
  • Также, если требуется задержка более 10 минут, невозможно , чтобы функция активности могла успешно работать с этим подходом!
  • Моя ранняя попытка избежать этого состояла в том, чтобы использовать «CreateTimer» в оркестраторе, а затем добавить действие в качестве продолжения, но я вижу только записи таймера в таблице «История».Продолжение не стреляет!Нарушаю ли я ограничение для кода оркестратора - «Код Orchestrator никогда не должен инициировать какую-либо асинхронную операцию»?

    foreach (var value in results)
    {
            //calculate time to start
            var timeToStart = ;
            var pnTask = context.CreateTimer(timeToStart , CancellationToken.None).ContinueWith(t => context.CallActivityAsync("PerformSubTask", value));
            tasks.Add(pnTask);
    }
    

    ОБНОВЛЕНИЕ : используя подход, предложенный Крисом

    Деятельность, которая вычисляет подзадачи и задержки

    [FunctionName("CalculateTasks")]
    public static List<TaskInfo> CalculateTasks([ActivityTrigger]string input,TraceWriter log)
    {
        //in reality time is obtained by calling an endpoint 
        DateTime currentTime = DateTime.UtcNow;
        return new List<TaskInfo> {
            new TaskInfo{ DelayInSeconds = 10, Origin = currentTime },
            new TaskInfo{ DelayInSeconds = 20, Origin = currentTime },
            new TaskInfo{ DelayInSeconds = 30, Origin = currentTime },
        };
    }
    
    public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
    {
        var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
        var currentTime = context.CurrentUtcDateTime;
        List<Task> tasks = new List<Task>();
        foreach (var value in results)
        {
            TimeSpan timeDifference = currentTime - value.Origin;
            TimeSpan delay = TimeSpan.FromSeconds(value.DelayInSeconds);
            var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;
    
            var timeToStart = currentTime.Add(actualDelay);
    
            Task delayedActivityCall = context
                 .CreateTimer(timeToStart, CancellationToken.None)
                 .ContinueWith(t => context.CallActivityAsync("PerformSubtask", value));
            tasks.Add(delayedActivityCall);
        }
    
        await Task.WhenAll(tasks);
    }
    

Кажется, что просто планирование задач изнутри оркестратора. В моем случае я вычисляю задачи и задержки в другой деятельности(CalculateTasks) перед циклом.Я хочу, чтобы задержки рассчитывались с использованием «текущего времени», когда выполнялась операция.Я использую DateTime.UtcNow в деятельности.Это как-то не очень хорошо при использовании в оркестраторе.Действия, указанные в «ContinueWith», просто не запускаются, и оркестратор всегда находится в состоянии «Выполнено».

Могу ли я не использовать время, записанное действием из оркестратора?

ОБНОВЛЕНИЕ 2

Так что обходной путьпредложенные Крисом работы!

Так как я не хочу собирать результаты действий, я избегаю звонить '1048 *' после планирования всех действий.Я делаю это для того, чтобы уменьшить конкуренцию в очереди управления, т. Е. Иметь возможность запустить другую оркестровку, если требуется.Тем не менее, статус «оркестратор» по-прежнему «1049 * Выполняется », пока все действия не завершатся.Я предполагаю, что он перемещается в ' Complete ' только после того, как последнее действие отправляет сообщение 'done' в очередь управления.

Я прав?Есть ли способ освободить оркестратора раньше, то есть сразу после планирования всех действий?

Ответы [ 3 ]

0 голосов
/ 17 мая 2018

await Task.Delay - определенно не лучший вариант: вы заплатите за это время, пока ваша функция не сделает ничего полезного.Максимальная задержка также ограничена 10 минутами в плане потребления.

На мой взгляд, необработанные сообщения очереди - лучший вариант для сценариев "забей и забудь".Установите правильные тайм-ауты видимости, и ваш сценарий будет работать надежно и эффективно.

Убийственная функция Durable Functions - await s, которые делают свою магию из приостановки и возобновления при сохранении объема.Таким образом, это отличный способ реализовать фан-ин, но вам это не нужно.

0 голосов
/ 18 мая 2018

Подход ContinueWith отлично работал для меня.Мне удалось смоделировать версию вашего сценария, используя следующий код оркестратора:

[FunctionName("Orchestrator")]
public static async Task Orchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context,
    TraceWriter log)
{
    var tasks = new List<Task>(10);
    for (int i = 0; i < 10; i++)
    {
        int j = i;
        DateTime timeToStart = context.CurrentUtcDateTime.AddSeconds(10 * j);
        Task delayedActivityCall = context
            .CreateTimer(timeToStart, CancellationToken.None)
            .ContinueWith(t => context.CallActivityAsync("PerformSubtask", j));
        tasks.Add(delayedActivityCall);
    }

    await Task.WhenAll(tasks);
}

И вот что стоит, вот код функции активности.

[FunctionName("PerformSubtask")]
public static void Activity([ActivityTrigger] int j, TraceWriter log)
{
    log.Warning($"{DateTime.Now:o}: {j:00}");
}

Из журналаВ результате я увидел, что все вызовы активности выполняются на расстоянии 10 секунд друг от друга.

Другой подход заключается в том, чтобы развернуться к нескольким под-оркестрациям (как предложено @jeffhollan), которые представляют собой простую короткую последовательность длительного таймера.задержка и ваш рабочий звонок.

ОБНОВЛЕНИЕ Я попытался использовать ваш обновленный образец и смог воспроизвести вашу проблему!Если вы выполняете локально в Visual Studio и настраиваете параметры исключений, чтобы они всегда прерывались на исключениях, вы должны увидеть следующее:

System.InvalidOperationException : 'Обнаружено многопоточное выполнение.Это может произойти, если код функции оркестратора ожидает выполнения задачи, которая не была создана методом DurableOrchestrationContext.Более подробную информацию можно найти в этой статье https://docs.microsoft.com/en-us/azure/azure-functions/durable-functions-checkpointing-and-replay#orchestrator-code-constraints.'

Это означает, что поток, вызвавший context.CallActivityAsync("PerformSubtask", j), был , а не такой же, как поток, который вызвал функцию оркестратора.Я не знаю, почему мой первоначальный пример не ударил это, или почему ваша версия сделала.Это как-то связано с тем, как TPL решает, какой поток использовать для запуска вашего ContinueWith делегата - что-то, что мне нужно изучить подробнее.

Хорошая новость заключается в том, что существует простой обходной путь, который заключается вукажите TaskContinuationOptions.ExecuteSynchronously , например:

Task delayedActivityCall = context
    .CreateTimer(timeToStart, CancellationToken.None)
    .ContinueWith(
        t => context.CallActivityAsync("PerformSubtask", j),
        TaskContinuationOptions.ExecuteSynchronously);

Пожалуйста, попробуйте это и дайте мне знать, если это решит проблему, которую вы наблюдаете.

В идеале вы бы не сталиЭто необходимо сделать при использовании Task.ContinueWith.Я открыл вопрос в GitHub для отслеживания этого: https://github.com/Azure/azure-functions-durable-extension/issues/317

Поскольку я не хочу собирать результаты действий, я избегаю звонить await Tasks.WhenAll(tasks) после планирования всех действий.Я делаю это для того, чтобы уменьшить конкуренцию в очереди управления, т. Е. Иметь возможность запустить другую оркестровку, если требуется.Тем не менее, статус «оркестратор» по-прежнему «работает», пока все действия не закончат работу.Я предполагаю, что он переходит в «Выполнено» только после того, как последнее действие отправляет сообщение «Готово» в очередь управления.

Это ожидается.Функции Orchestrator фактически никогда не завершаются, пока не будут выполнены все нерешенные долговременные задачи.Нет никакого способа обойти это.Обратите внимание, что вы по-прежнему можете запускать другие экземпляры оркестратора, может возникнуть конфликт, если они окажутся в одном и том же разделе (по умолчанию 4 раздела).

0 голосов
/ 17 мая 2018

Я думаю, что долговечность определенно имеет смысл для этого рабочего процесса.Я думаю, что лучшим вариантом было бы использовать функцию задержки / таймера, как вы сказали, но, основываясь на синхронном характере выполнения, я не думаю, что добавлю все в список задач, который действительно ожидает .WhenAll() или .WhenAny(), к которым вы не стремитесь.Я думаю, что лично я бы просто сделал последовательный цикл foreach с задержками таймера для каждой задачи.Так что псевдокод из:

for(int x = 0; x < results.Length; x++) { await context.CreateTimer(TimeSpan.FromMinutes(1), ...); await context.CallActivityAsync("PerformTaskAsync", results[x]); }

Вам нужны эти ожидающие там, независимо от того, так что просто избегание await Task.WhenAll(...) может вызвать некоторые проблемы в приведенном выше примере кода.Надеюсь, это поможет

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...