Я использую Parallel.ForEach для выполнения некоторой трудоемкой обработки коллекций элементов.Обработка фактически выполняется внешним инструментом командной строки, и я не могу это изменить.Однако, похоже, что Parallel.ForEach будет «зависать» на долго работающем элементе из коллекции.Я разобрал проблему и могу показать, что Parallel.ForEach фактически ожидает завершения этого длинного и не пропускает других.Я написал консольное приложение, чтобы продемонстрировать проблему:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace testParallel
{
class Program
{
static int inloop = 0;
static int completed = 0;
static void Main(string[] args)
{
// initialize an array integers to hold the wait duration (in milliseconds)
var items = Enumerable.Repeat(10, 1000).ToArray();
// set one of the items to 10 seconds
items[50] = 10000;
// Initialize our line for reporting status
Console.Write(0.ToString("000") + " Threads, " + 0.ToString("000") + " completed");
// Start the loop in a task (to avoid SO answers having to do with the Parallel.ForEach call, itself, not being parallel)
var t = Task.Factory.StartNew(() => Process(items));
// Wait for the operations to compelte
t.Wait();
// Report finished
Console.WriteLine("\nDone!");
}
static void Process(int[] items)
{
// SpinWait (not sleep or yield or anything) for the specified duration
Parallel.ForEach(items, (msToWait) =>
{
// increment the counter for how many threads are in the loop right now
System.Threading.Interlocked.Increment(ref inloop);
// determine at what time we shoule stop spinning
var e = DateTime.Now + new TimeSpan(0, 0, 0, 0, msToWait);
// spin until the target time
while (DateTime.Now < e) /* no body -- just a hard loop */;
// count another completed
System.Threading.Interlocked.Increment(ref completed);
// we're done with this iteration
System.Threading.Interlocked.Decrement(ref inloop);
// report status
Console.Write("\r" + inloop.ToString("000") + " Threads, " + completed.ToString("000") + " completed");
});
}
}
}
По сути, я делаю массив int для хранения количества миллисекунд, которое занимает данная операция.Я установил их все на 10, кроме одного, который я установил на 10000 (так, 10 секунд).Я запускаю Parallel.ForEach в задаче и обрабатываю каждое целое число в жёстком цикле ожидания (поэтому он не должен уступать, не спать или что-то ещё).На каждой итерации я сообщаю, сколько итераций сейчас составляет в теле цикла, и сколько итераций мы завершили.В основном, это хорошо.Тем не менее, ближе к концу (по времени), он сообщает «001 потоков, 987 завершено».
Мой вопрос: почему он не использует 7 других ядер для работы на оставшихся 13 «рабочих местах»?Эта одна длительная итерация не должна мешать ей обрабатывать другие элементы в коллекции, верно?
Этот пример является фиксированной коллекцией, но его можно легко установить как перечисляемый.Мы не хотели бы прекратить получать следующий элемент в перечисляемом только потому, что он занимал много времени.