Цепочка произвольного числа задач вместе в C # .NET - PullRequest
0 голосов
/ 11 октября 2019

Что у меня есть

У меня есть набор методов асинхронной обработки, подобных:

public class AsyncProcessor<T>
{
    //...rest of members, etc.

    public Task Process(T input)
    {
        //Some special processing, most likely inside a Task, so
        //maybe spawn a new Task, etc.
        Task task = Task.Run(/* maybe private method that does the processing*/);
        return task;
    }
}

Что я хочу

Я бы хотел объединить их все вместе, чтобы выполнить в последовательном порядке .

Что я пытался

Я пытался сделать следующее:

public class CompositeAsyncProcessor<T>
{
    private readonly IEnumerable<AsyncProcessor<T>> m_processors;

    //Constructor receives the IEnumerable<AsyncProcessor<T>> and
    //stores it in the field above.

    public Task ProcessInput(T input)
    {
        Task chainedTask = Task.CompletedTask;

        foreach (AsyncProcessor<T> processor in m_processors)
        {
            chainedTask = chainedTask.ContinueWith(t => processor.Process(input));
        }

        return chainedTask;
    }
}

Что пошло не так

Однако задачи не запускаются в порядке , поскольку, как я понял, внутри вызова к ContinueWith вызов processor.Process(input) выполняется немедленно, и метод возвращается независимо от состояния. возвращенного задания. Поэтому все задачи обработки по-прежнему начинаются почти одновременно.

Мой вопрос

Мой вопрос заключается в том, можно ли сделать что-нибудь элегантное, чтобы связать задачи в порядке (т.е. без выполнения перекрытия). Могу ли я достичь этого с помощью следующего утверждения (например, я немного борюсь с деталями)?

chainedTask = chainedTask.ContinueWith(async t => await processor.Process(input));

Кроме того, как бы я сделал это без использованияasync / await , только ContinueWith?

Зачем мне это нужно?

Поскольку мои Processor объекты имеют доступ и запрашивают данные у "небезопасные"ресурсы. Также , я не могу просто ждать всех методов, потому что Я понятия не имею, сколько их , поэтому я не могу просто записать необходимые строки кода.

Что я имею в виду под небезопасным потоком? Конкретная проблема

Поскольку я, возможно, неправильно использую этот термин, немного лучше объяснить этот пример. Среди «ресурсов», используемых моими Processor объектами, все они имеют доступ к объекту, например:

public interface IRepository
{
    void Add(object obj);

    bool Remove(object obj);

    IEnumerable<object> Items { get; }
}

Реализация, используемая в настоящее время, относительно наивна. Поэтому некоторые Processor объекты добавляют вещи, в то время как другие извлекают Items для проверки. Естественно, одно из исключений, которое я получаю слишком часто:

InvalidOperationException: коллекция была изменена, операция перечисления может не выполняться.

Я мог бы потратить некоторое времяблокировка доступа и предварительный запуск перечислений. Тем не менее, это был второй вариант, к которому я прибегал, в то время как моей первой мыслью было просто заставить процессы запускаться последовательно.

Почему я должен использовать Задачи?

Пока у меня полный контроль над В этом случае я мог бы сказать, что для целей вопроса я не смогу изменить базовую реализацию, так что бы произошло, если бы я застрял с Задачами? Кроме того, , операции на самом деле do представляют относительно длительные операции с процессором, плюс я пытаюсь получить отзывчивый пользовательский интерфейс, поэтому мне нужно было разгрузить асинхронные операции. Будучи полезным и, в большинстве моих сценариев использования, мне не нужно было связывать несколько из них, а по одному каждый раз (или пару, но всегда конкретную и с определенным количеством, так что я смог подключить ихвместе без итераций и async / await), один из вариантов использования, наконец, потребовал связать воедино неизвестное число задач.

Как я справляюсь с этим в настоящее время

То, как я справляюсь с этимв настоящее время добавляется вызов к Wait() внутри вызова ContinueWith, то есть:

foreach (AsyncProcessor<T> processor in m_processors)
{
    chainedTask = chainedTask.ContinueWith(t => processor.Process(input).Wait());
}

Буду признателен за любую идею о том, как мне следует это сделать, или как я могу сделать это более элегантно (или"асинхронно-правильно", так сказать). Кроме того, я хотел бы знать, как я могу сделать это без async / await .

Почему мой вопрос отличается от этот вопрос , который не ответил на мой вопросполностью.

Поскольку у связанного вопроса есть две задачи, поэтому решение состоит в том, чтобы просто написать две необходимые строки, в то время как у меня есть произвольное (и неизвестное) количество задач, поэтому янужна подходящая итерация. Кроме того, мой метод не асинхронный . Теперь я понимаю (из единственного кратко доступного ответа, который был удален), что я мог бы сделать это довольно легко, если бы я изменил свой метод на async и await метод Task каждого процессора, но яЯ все еще хочу знать, как этого можно достичь без синтаксиса асинхронного ожидания / ожидания.

Почему мой вопрос не является дубликатом других связанных вопросов

Поскольку ни один из них не объясняет, как правильно составлять цепочку, используя ContinueWith, и меня интересует решение, которое использует ContinueWith, а не использует шаблон асинхронного ожидания / ожидания . Я знаю, что этот шаблон может быть предпочтительным решением, и я хочу понять, как (если возможно) создавать произвольные цепочки с использованием вызовов ContinueWith. Теперь я знаю, что мне не нужно ContinueWith. Вопрос в том, как мне это сделать с ContinueWith?

Ответы [ 2 ]

2 голосов
/ 11 октября 2019

foreach + await будет работать Process последовательно.

    public async Task ProcessInputAsync(T input)
    {
        foreach (var processor in m_processors)
        {
            await processor.Process(input));
        }
    }

Кстати. Process, должен называться ProcessAsync

1 голос
/ 11 октября 2019

Метод Task.ContinueWith не понимает асинхронные делегаты, как Task.Run, поэтому, когда вы возвращаете Task, он рассматривает это как нормальное возвращаемое значение и переноситэто в другом Task. Таким образом, вы получите Task<Task> вместо того, что ожидали получить. Проблема была бы очевидна, если бы AsyncProcessor.Process возвращал общий Task<T>. В этом случае вы получите ошибку компиляции из-за недопустимого приведения типов от Task<Task<T>> до Task<T>. В вашем случае вы разыгрываете от Task<Task> до Task, что является законным, поскольку Task<TResult> происходит от Task.

Решение проблемы легко. Вам просто нужно развернуть Task<Task> до простого Task, и есть встроенный метод Unwrap, который делает именно это.

Есть еще одна проблема, которую вынужно решить, хотя. В настоящее время ваш код подавляет все исключения, которые могут возникнуть в каждом отдельном AsyncProcessor.Process, что, я не думаю, было задумано. Таким образом, вы должны решить, какой стратегии следовать в этом случае. Собираетесь ли вы распространять первое исключение немедленно или предпочитаете кешировать их все и распространять в конце, объединенном в AggregateException, как Task.WhenAll? Пример ниже реализует первую стратегию.

public class CompositeAsyncProcessor<T>
{
    //...
    public Task Process(T input)
    {
        Task current = Task.CompletedTask;
        foreach (AsyncProcessor<T> processor in m_processors)
        {
            current = current.ContinueWith(antecessor =>
            {
                if (antecessor.IsFaulted)
                    return Task.FromException<T>(antecessor.Exception.InnerException);
                return processor.Process(input);
            },
                CancellationToken.None,
                TaskContinuationOptions.ExecuteSynchronously,
                TaskScheduler.Default
            ).Unwrap();
        }
        return current;
    }
}

Я использовал перегрузку ContinueWith, которая позволяет настраивать все параметры, поскольку значения по умолчанию не идеальны. По умолчанию TaskContinuationOptions равно None. Конфигурируя его на ExecuteSynchronously, вы минимизируете переключатели потоков, поскольку каждое продолжение будет выполняться в том же потоке, который завершил предыдущий.

Планировщик задач по умолчанию - TaskScheduler.Current. Указывая TaskScheduler.Default, вы явно указываете, что хотите, чтобы продолжения выполнялись в потоках пула потоков (для некоторых исключительных случаев, которые не смогут работать синхронно). TaskScheduler.Current зависит от контекста, и если он вас когда-нибудь удивит, это не будет хорошим способом.

Как вы видите, есть много проблем с подходом старой школы ContinueWith. Использование современного await в цикле намного проще в реализации, и намного сложнее понять его неправильно.

...