Как узнать, когда остановить параллельный foreach, где потребитель также является производителем в C # - PullRequest
0 голосов
/ 31 марта 2019

Я пытаюсь обрабатывать некоторые элементы в BlockingCollection параллельно, используя Parallel.ForEach (). При обработке предмета он может генерировать еще 0-2 предмета для обработки. Количество элементов для обработки всегда в конечном итоге достигнет 0.

Моя проблема заключается в том, что, поскольку потребитель также является производителем (элементы обработки могут генерировать больше элементов для обработки), я не могу вызвать функцию CompleteAdding () из BlockingCollection, когда коллекция BlockingCollection пуста, поскольку могут быть другие потоки, которые в настоящее время обрабатывают элемент, который будет генерировать больше предметов. Поэтому я не знаю, как сообщить BlockingCollection / Parallel.ForEach, что он может завершиться.

Вот пример ситуации (модифицированной для простоты)

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace Example
{
    class Example
    {
        static void Main(string[] args)
        {
            var process = new BlockingCollection<int>() { 30 };

            var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };

            Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>
            {
                if (item > 20)
                {
                    // Some add 2 items
                    process.Add(item - 1);
                    process.Add(item - 1);
                    Console.WriteLine($"process.Count: {process.Count} | item: {item} | Added: 2");
                }
                else if (item > 10)
                {
                    // Some add 1 item
                    process.Add(item-1);
                    Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 1");
                }
                else
                {
                    // Some add 0 items
                    Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 0");
                }
            });

            // Parallel.ForEach never exits
            Console.WriteLine("Completed Processing");

            Console.ReadKey();
        }
    }
}

Я пытался изменить MaxDegreeOfParallelism во время Parallel.ForEach до минимума количества элементов для обработки и Environment.ProcessorCount, но это ничего не делает во время Parallel.ForEach.

Я также попытался сохранить счетчик количества необработанных элементов и выполнить блокировку при обновлении этого числа в каждом потоке. Когда необработанные элементы равны 0, тогда я вызываю метод AddingCompleted. Это тоже не работает.

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace Example
{
    class Example
    {
        static void Main(string[] args)
        {
            var runningLock = new object();
            int running = 0;

            var process = new BlockingCollection<int>() { 30 };

            var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };

            Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>
            {
                lock (runningLock)
                {
                    running++;
                }

                if (item > 20)
                {
                    // Some add 2 items
                    process.Add(item - 1);
                    process.Add(item - 1);
                    Console.WriteLine($"process.Count: {process.Count} | item: {item} | Added: 2 | running: {running}");
                }
                else if (item > 10)
                {
                    // Some add 1 item
                    process.Add(item - 1);
                    Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 1 | running: {running}");
                }
                else
                {
                    // Some add 0 items
                    Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 0 | running: {running}");
                }

                lock (runningLock)
                {
                    running--;

                    if (running == 0 && process.Count == 0)
                    {
                        Console.WriteLine($"Stopping | running: {running} | process.Count: {process.Count}");
                        process.CompleteAdding();
                    }
                }
            });

            // Parallel.ForEach never exits
            Console.WriteLine("Completed Processing");

            Console.ReadKey();
        }
    }
}

Должен ли я использовать что-то другое вместо Parallel.ForEach для этого?

Кроме того, при установке MaxDegreeOfParallelism в 1. Если начальный элемент BlockingCollection>> 27, он обрабатывает все нормально, однако, если он <= 26, он останавливает обработку элементов около 16? Кроме того, более высокий MaxDegreeOfParallelism приводит к остановке обработки элементов с меньшим числом. </p>

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace Example
{
    class Example
    {
        static void Main(string[] args)
        {
            // Normal
            var process = new BlockingCollection<int>() { 27 };
            // Stops around 16
            //var process = new BlockingCollection<int>() { 26 };

            var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = 1 };

            Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>
            {
                if (item > 20)
                {
                    // Some add 2 items
                    process.Add(item - 1);
                    process.Add(item - 1);
                    Console.WriteLine($"Process Size: {process.Count} | Current Num: {item} | Added: 2");
                }
                else if (item > 10)
                {
                    // Some add 1 item
                    process.Add(item - 1);
                    Console.WriteLine($"Process Size: {process.Count}| Current Num: {item} | Added: 1");
                }
                else
                {
                    // Some add 0 items
                    Console.WriteLine($"Process Size: {process.Count}| Current Num: {item} | Added: 0");
                }
            });

            // Parallel.ForEach never exits
            Console.WriteLine("Completed Processing");

            Console.ReadKey();
        }
    }
}

Вот фактический код , если кто-то предпочитает смотреть на реальный код вместо абстрактной версии.

1 Ответ

0 голосов
/ 03 апреля 2019

Вы были на правильном пути с этим:

Я также попытался сохранить счетчик количества необработанных элементов и выполнить блокировку при обновлении этого числа в каждом потоке. Когда необработанные элементы равны 0, тогда я вызываю метод AddingCompleted.

Проблема в том, что вы фактически подсчитываете количество активных работников, а не количество необработанных предметов. То есть Вы только увеличиваете свой счетчик, когда начинаете что-то обрабатывать, поэтому в очереди может быть много других элементов, не представленных этим счетчиком. Для этого вам нужно увеличить счетчик при каждом добавлении чего-либо в очередь, а затем уменьшить счетчик при каждом завершении обработки чего-либо из очереди.

Теперь, если бы вы попробовали это, вы, вероятно, столкнулись бы с другой проблемой: по умолчанию метод Parallel.ForEach() пытается пакетировать элементы из источника. Это плохо работает с источником, таким как BlockingCollection<T>, который может блокироваться во время перечисления, ожидая дополнительных данных. В вашем примере это приводит к тупику, когда Parallel.ForEach() ожидает большего количества элементов, прежде чем он поставит в очередь самый последний пакет, в то время как BlockingCollection<T> ожидает большего количества элементов для обработки и, таким образом, вызывает добавление большего количества элементов в очередь.

С помощью метода ForEach(), ожидающего коллекцию, и коллекции, ожидающей метод ForEach(), вы получаете тупик.

Однако для этого есть исправление: вы можете предоставить ForEach() разделитель, который специально настроен не для буферизации данных, а для постановки в очередь рабочих элементов сразу по мере их получения.

Соединяя эти две стратегии, вы получаете версию своего кода, которая выглядит примерно так (с некоторыми незначительными изменениями в выводе, которые я добавил для диагностических целей):

static void Main(string[] args)
{
    const int firstValue = 30;
    const int secondValues = 20;
    const int thirdValues = 10;

    var process = new BlockingCollection<int>() { firstValue };

    var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };
    int totalItemCount = process.Count;

    OrderablePartitioner<int> partitioner = Partitioner.Create(process.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);

    Parallel.ForEach(partitioner, parallelOptions, (item, state, i) =>
    {
        string message;

        if (item > secondValues)
        {
            // Some add 2 items
            Interlocked.Add(ref totalItemCount, 2);
            process.Add(item - 1);
            process.Add(item - 1);
            message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count} | item: {item} | Added: 2";
        }
        else if (item > thirdValues)
        {
            // Some add 1 item
            Interlocked.Increment(ref totalItemCount);
            process.Add(item - 1);
            message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count}| item: {item} | Added: 1";
        }
        else
        {
            // Some add 0 items
            message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count}| item: {item} | Added: 0";
        }

        int newCount = Interlocked.Decrement(ref totalItemCount);

        if (newCount == 0)
        {
            process.CompleteAdding();
        }

        Console.WriteLine($"{message} | newCount: {newCount} | i: {i}");
    });

    // Parallel.ForEach will exit
    Console.WriteLine("Completed Processing");    
    Console.ReadKey();
}
...