Parallel.ForEach с asyn c лямбда-ожиданием для завершения всех итераций - PullRequest
2 голосов
/ 24 февраля 2020

недавно я видел несколько SO-потоков, связанных с Parallel.ForEach, смешанных с asyn c lambdas, но все предложенные ответы были своего рода обходными путями.

Есть ли способ, как я мог написать:

List<int> list = new List<int>[]();

Parallel.ForEach(arrayValues, async (item) =>
{
  var x = await LongRunningIoOperationAsync(item);
  list.Add(x);
});

Как я могу гарантировать, что список будет содержать все элементы всех итераций, выполненных с лямбдами в каждой итерации?

Как обычно Parallel.ForEach будет работать с асинхронными c лямбдами, если он нажмет await, передаст ли он свой поток на следующую итерацию?

Я предполагаю, что поле ParallelLoopResult IsCompleted не является правильным, так как он вернет true, когда все итерации выполнены, независимо от того, завершены ли их реальные лямбда-задания или нет?

Ответы [ 2 ]

7 голосов
/ 24 февраля 2020

недавно я видел несколько SO-потоков, связанных с Parallel.ForEach, смешанных с асин * лямбдами c, но все предложенные ответы были своего рода обходными путями.

Ну, это потому что Parallel не работает с async. И с другой точки зрения, почему вы хотите смешать их в первую очередь? Они делают противоположные вещи . Parallel - это добавление потоков, а async - отказ от потоков. Если вы хотите выполнять асинхронную работу одновременно, используйте Task.WhenAll. Это правильный инструмент для работы; Parallel это не так.

Тем не менее, звучит так, как будто вы хотите использовать не тот инструмент, вот как вы это делаете ...

Как я могу обеспечить этот список будет содержать все элементы из всех итераций, выполненных с лямбдами в каждой итерации?

Вам понадобится какой-то сигнал, который может блокировать некоторый код до завершения обработки, например, CountdownEvent или Monitor. Кроме того, вам необходимо защитить доступ к не-поточно-безопасному List<T>.

Как обычно Parallel.ForEach будет работать с асинхронными c лямбдами, если это Удар в ожидании, передаст ли он свой поток на следующую итерацию?

Поскольку Parallel не понимает async лямбда-выражений, когда первый await возвращает (возвращается) своему вызывающему, Parallel будет предполагать, что целое число l oop завершено.

Я предполагаю, что поле ParallelLoopResult IsCompleted не является правильным, так как оно вернет true, когда все итерации выполнены, независимо от того, является ли их действительная лямбда задания завершены или нет?

Правильно. Насколько известно Parallel, он может «видеть» только метод первого await, который возвращает его вызывающему. Так что он не знает, когда лямбда будет завершена. Он также будет предполагать, что итерации завершены слишком рано, что приводит к удалению разделов.

3 голосов
/ 24 февраля 2020

Вам не нужно Parallel.For/ForEach здесь вам просто нужно дождаться списка задач.

Фон

Короче говоря, вы должны быть очень осторожны с asyn c lambdas , и если вы передаете их Action или Func<Task>

Ваша проблема в том, что Parallel.For / ForEach не подходит для asyn c и шаблона ожидания или IO-связанных задач . Они подходят для рабочих нагрузок, связанных с процессором . Это означает, что они по существу имеют параметры Action и давайте планировщик задач создадут для вас задач

Если вы хотите запустить несколько asyn c задач одновременно используют Task.WhenAll или TPL Dataflow Block (или что-то подобное), которые могут эффективно работать как с CPU связанный и IO bound работает с нагрузками, или, говоря более прямо, они могут иметь дело с tasks , что и есть asyn c метод .

Если вам не нужно делать больше внутри своей лямбды (для которой вы не показали), просто используйте Select и WhenAll

var tasks = items .Select(LongRunningIoOperationAsync);
var results = await Task.WhenAll(tasks); // here is your list of int

Если вы это сделаете, вы все еще можно использовать await,

var tasks = items.Select(async (item) =>
   {
       var x = await LongRunningIoOperationAsync(item);
       // do other stuff
       return x;
   });

var results = await Task.WhenAll(tasks);

Примечание : если вам нужна расширенная функциональность Parallel.ForEach (а именно Опции для управления максимальным параллелизмом), есть Есть несколько подходов, однако RX или DataFlow могут быть наиболее краткими

...