C # Parallel.ForEach заблокирован на длинной итерации - PullRequest
0 голосов
/ 04 марта 2019

Я использую Parallel.ForEach для выполнения некоторой трудоемкой обработки коллекций элементов.Обработка фактически выполняется внешним инструментом командной строки, и я не могу это изменить.Однако, похоже, что Parallel.ForEach будет «зависать» на долго работающем элементе из коллекции.Я разобрал проблему и могу показать, что Parallel.ForEach фактически ожидает завершения этого длинного и не пропускает других.Я написал консольное приложение, чтобы продемонстрировать проблему:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace testParallel
{
    class Program
    {
        static int inloop = 0;
        static int completed = 0;
        static void Main(string[] args)
        {
            // initialize an array integers to hold the wait duration (in milliseconds)
            var items = Enumerable.Repeat(10, 1000).ToArray();

            // set one of the items to 10 seconds
            items[50] = 10000;


            // Initialize our line for reporting status
            Console.Write(0.ToString("000") + " Threads, " + 0.ToString("000") + " completed");

            // Start the loop in a task (to avoid SO answers having to do with the Parallel.ForEach call, itself, not being parallel)
            var t = Task.Factory.StartNew(() => Process(items));

            // Wait for the operations to compelte
            t.Wait();

            // Report finished
            Console.WriteLine("\nDone!");
        }

        static void Process(int[] items)
        {
            // SpinWait (not sleep or yield or anything) for the specified duration
            Parallel.ForEach(items, (msToWait) =>
            {
                // increment the counter for how many threads are in the loop right now
                System.Threading.Interlocked.Increment(ref inloop);

                // determine at what time we shoule stop spinning
                var e = DateTime.Now + new TimeSpan(0, 0, 0, 0, msToWait);

                // spin until the target time
                while (DateTime.Now < e) /* no body -- just a hard loop */;

                // count another completed
                System.Threading.Interlocked.Increment(ref completed);

                // we're done with this iteration
                System.Threading.Interlocked.Decrement(ref inloop);

                // report status
                Console.Write("\r" + inloop.ToString("000") + " Threads, " + completed.ToString("000") + " completed");

            });
        }
    }
}

По сути, я делаю массив int для хранения количества миллисекунд, которое занимает данная операция.Я установил их все на 10, кроме одного, который я установил на 10000 (так, 10 секунд).Я запускаю Parallel.ForEach в задаче и обрабатываю каждое целое число в жёстком цикле ожидания (поэтому он не должен уступать, не спать или что-то ещё).На каждой итерации я сообщаю, сколько итераций сейчас составляет в теле цикла, и сколько итераций мы завершили.В основном, это хорошо.Тем не менее, ближе к концу (по времени), он сообщает «001 потоков, 987 завершено».

Мой вопрос: почему он не использует 7 других ядер для работы на оставшихся 13 «рабочих местах»?Эта одна длительная итерация не должна мешать ей обрабатывать другие элементы в коллекции, верно?

Этот пример является фиксированной коллекцией, но его можно легко установить как перечисляемый.Мы не хотели бы прекратить получать следующий элемент в перечисляемом только потому, что он занимал много времени.

1 Ответ

0 голосов
/ 04 марта 2019

Я нашел ответ (или, по крайней мере, ответ).Это связано с разбиением на части.Ответ SO здесь получил его для меня.Так что, в основном, в верхней части моей функции «Процесс», если я перехожу с этого:

        static void Process(int[] items)
        {
            Parallel.ForEach(items, (msToWait) => { ... });
        }

на это

        static void Process(int[] items)
        {
            var partitioner = Partitioner.Create(items, EnumerablePartitionerOptions.NoBuffering);
            Parallel.ForEach(partitioner, (msToWait) => { ... });
        }

, оно захватывает работу по одному за раз.Для более типичного случая параллели для каждого, когда тело не занимает больше секунды, я, конечно, могу видеть разбивку на части работы.В моем случае, однако, каждая часть тела может занять от полсекунды до 5 часов.Я, конечно, не хотел бы, чтобы группа из 10-секундных элементов разнообразия была заблокирована одним 5-часовым элементом.Так что в этом случае издержки «по одному» вполне стоят того.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...