Можно ли использовать Task.Run в ForEachAsync? - PullRequest
1 голос
/ 17 марта 2019

Мы используем метод ForEachAsync из Вложенности ожидают в Parallel.ForEach , первоначально , предложенный Стивеном Таубом (внизу его блоба).

public static async Task ForEachAsync<T>(
        this IEnumerable<T> source, int degreeOfParallelism, Func<T, Task> body, Action<Task> handleException = null)
    {
        if (source.Any())
        {
            await Task.WhenAll(
                from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
                select Task.Run(async delegate
                {
                    using (partition)
                        while (partition.MoveNext())
                            await body(partition.Current).ContinueWith(t =>
                            {
                                //observe exceptions
                                if (t.IsFaulted)
                                {
                                    handleException?.Invoke(t);
                                }
                            });
                }));
        }
    }

Но у одного из наших коллег есть озабоченность по поводу издержек Task.Run, описанных в серии публикаций Стивена Клири https://blog.stephencleary.com/2013/11/taskrun-etiquette-examples-even-in.html

Есть (по крайней мере) четыре проблемы с эффективностью, возникшие, как тольковы используете await с Task.Run в ASP.NET:
• Дополнительное (ненужное) переключение потоков в поток пула потоков Task.Run.Аналогичным образом, когда этот поток завершает запрос, он должен ввести контекст запроса (который не является фактическим переключателем потока, но имеет служебные данные).
• Создается дополнительный (ненужный) мусор.Асинхронное программирование - это компромисс: вы получаете повышенную скорость отклика за счет более высокого использования памяти.В этом случае вы в конечном итоге создаете больше мусора для асинхронных операций, что совершенно не нужно.• Эвристика пула потоков ASP.NET сбрасывается Task.Run «неожиданно» заимствуя поток пула потоков.У меня нет большого опыта здесь, но мой инстинкт инстинкта подсказывает мне, что эвристика должна хорошо восстанавливаться, если неожиданная задача действительно короткая, и не справится с ней так элегантно, если неожиданная задача длится более двух секунд.
• ASP.NET не может завершить запрос досрочно, т. Е. Если клиент отключается или время ожидания истекло.В синхронном случае ASP.NET знал поток запроса и мог прервать его.В асинхронном случае ASP.NET не знает, что поток вторичного пула потоков «предназначен» для этого запроса.Это можно исправить с помощью токенов отмены, но это выходит за рамки этого сообщения в блоге.

У меня вопрос: можно ли использовать Task.Run для ForEachAsync или существует лучший способ запуска несколькихасинхронные задачи параллельно с управляемой собакой (степень параллелизма)?Например, я хочу обработать 400 элементов, при этом параллельно выполняется не более 100 элементов.

Мы используем метод ForEachAsync как в средах .Net, так и .Net Core, поэтому если ответы для разных сред будутотличаться, я буду рад узнать об обоих.

Обновление, чтобы уточнить технологии, которые мы используем:
У нас есть службы Windows / консоли (написано в .Net4.6.1), которые читают тысячи записей из БДа затем публикуем их по отдельности параллельно (например, dop = 100) в сервис веб-API (мы планировали отправлять их партиями, но пока не реализовали).
У нас также есть службы Asp.Net Core с сервисом фонового хостинга, которыйрегулярно (например, каждые 10 секунд) перетаскивает страницу элементов (например, до 400), а затем параллельно (например, dop = 100) сохраняет их в отдельные BLOB-объекты Azure.

1 Ответ

1 голос
/ 17 марта 2019

Простым способом асинхронной обработки 400 сообщений с MDOP 100 будет использование ActionBlock<T>. Примерно так будет работать:

public class ActionBlockExample
{
    private ActionBlock<int> actionBlock;

    public ActionBlockExample()
    {
        actionBlock = new ActionBlock<int>(x => ProcessMsg(x), new ExecutionDataflowBlockOptions()
        {
            MaxDegreeOfParallelism = 100
        });
    }

    public async Task Process()
    {
        foreach (var msg in Enumerable.Range(0, 400))
        {
            await actionBlock.SendAsync(msg);
        }
        actionBlock.Complete();
        await actionBlock.Completion;
    }

    private Task ProcessMsg(int msg) => Task.Delay(100);
}

ActionBlock по умолчанию имеет несвязанный входной буфер и принимает все 400 сообщений, обрабатывая максимум 100 параллельно. Здесь нет необходимости в Task.Run, так как все сообщения обрабатываются в фоновом режиме.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...