Цикл Task.Run с внешними параметрами изменяется циклом - PullRequest
1 голос
/ 26 апреля 2019

У меня есть этот код (упрощенный) для обработки массива параметров в 100 различных параллельных потоках, но переменные x и y изменяются циклом внутри потоков, пока они используются в потоках. Если я запускаю функцию с 1 потоком, то она работает.

Я также попытался поместить параметры в ConcurrentBag и сделать цикл с foreach, но с тем же результатом параметры смешиваются в потоках.

List<Task> tasks = new List<Task>();
var listConcurentBag = new ConcurrentBag<int>();
int nThreadCount = 0;

for (x=0; x<1000; x++)
  for (y=0; y<1000; y++)
  {
     int x1=x;
     int y2=y;

     Task t = Task.Run(() =>
     {         
        int param1=x1;
        int param2=y2;

        // some calculations with param1 and param2

        listConcurentBag.Add(result);
     }); // tasks

     tasks.Add(t);
     nThreadCount++;

   if (nThreadCount == 100) // after 100 threads started, wait
   {
       nThreadCount = 0;
       Task.WaitAll(tasks.ToArray());
       tasks.Clear();
   }
 }

Ответы [ 2 ]

2 голосов
/ 27 апреля 2019

Вы должны использовать Microsoft Reactive Framework (он же Rx) - NuGet System.Reactive и добавить using System.Reactive.Linq; - тогда вы можете сделать это:

var query =
    from x in Observable.Range(0, 1000)
    from y in Observable.Range(0, 1000)
    from r in Observable.Start(() => GetResult(x,  y))
    select new { x, y, r };

IDisposable subscription =
    query
        .Buffer(100)
        .Subscribe(results =>
        {
            /* do something with each buffered list of results */
        });

Теперь это не совсем то же самое, что ваш текущий код, но он дает вам блоки с 100 результатами, как только они становятся доступными, используя максимальную емкость пула потоков.

Вы можете изменить его, чтобы установить параллелизм следующим образом:

var query =
    from x in Observable.Range(0, 1000)
    from y in Observable.Range(0, 1000)
    select Observable.Start(() => new { x, y, r = GetResult(x,  y) });

IDisposable subscription =
    query
        .Merge(maxConcurrent: 100) // limit to 100 threads
        .Buffer(count: 100) // produce 100 results at a time
        .Subscribe(results =>
        {
            /* do something with the list of results */
        });

Если вы хотите остановить код до его естественного завершения, просто наберите subscription.Dispose();.

Rx имеет тенденцию производить намного более чистый код, ИМХО.

1 голос
/ 26 апреля 2019

У меня есть предложение для альтернативной реализации, которое вы можете найти или не найти подходящим для ваших нужд.Вместо обработки задач партиями по 100 вы можете выразить вложенные циклы for как одно перечислимое, а затем передать его встроенному методу Parallel.ForEach для выполнения работы параллелизма.

private IEnumerable<(int, int)> GetNestedFor()
{
    for (int x = 0; x < 1000; x++)
    {
        for (int y = 0; y < 1000; y++)
        {
            yield return (x, y); // return a ValueTuple<int, int>
        }
    }
}

ThreadPool.SetMinThreads(100, 100);
var options = new ParallelOptions() { MaxDegreeOfParallelism = 100 };
Parallel.ForEach(GetNestedFor(), options, item =>
{
    int param1 = item.Item1;
    int param2 = item.Item2;
    Console.WriteLine($"Processing ({param1}, {param2})");
    Thread.Sleep(100); // Simulate some work
});

Вывод:

Обработка (0, 1)
Обработка (0, 2)
Обработка (0, 0)
Обработка (0, 3)
...
Обработка (0, 998)
Обработка (0, 997)
Обработка (0, 999)
Обработка (1, 0)
Обработка (1,1)
...
Обработка (999, 999)
Обработка (999, 998)

...