AsParallel () и размер внутреннего буфера - PullRequest
1 голос
/ 07 декабря 2011

Как ограничить количество элементов, которые AsParallel () читает заранее и помещает в свой внутренний буфер?

Вот пример:

int returnedCounter;

IEnumerable<int> Enum()
{
    while (true)
        yield return Interlocked.Increment(ref returnedCounter);
}

[TestMethod]
public void TestMethod1()
{
    foreach (var i in Enum().AsParallel().Select(a => a))
    {
        Thread.Sleep(3000);
        break;
    }
    Console.WriteLine(returnedCounter);
}

Я потребляю 1 вещь, сплю, прекращаю перечисление. Он печатает 526400 на моей машине. В моем реальном проекте каждый элемент выделяет тысячи килобайт. AsParallel () считывает много элементов заранее, что приводит к очень плохому потреблению памяти и растрате ЦП.

Помещение WithMergeOptions (ParallelMergeOptions.NotBuffered) немного помогает. Он печатает 4544. Но это все еще слишком много для меня.

Ожидание в Enum () останавливает цикл в главном потоке.

Ответы [ 2 ]

4 голосов
/ 07 декабря 2011

Еще один вопрос по поводу Partitioners !

В вашем случае вам нужно будет найти / написать разделителя, который принимает только один элемент за раз.

Вот статья о Пользовательских разделах


UPDATE:

Я только что вспомнил, где видел реализацию SingleItemPartitioner: здесь она находится в проекте ParallelExtensionsExtras: Примеры параллельного программирования с .NET Framework

Я также только что прочитал ваш тестовый код. Я, наверное, должен был сделать это в первый раз!

Этот код:

Enum().AsParallel().Select(a => a)

означает: возьмите Enum() и перечислите его как можно быстрее, параллельно, и верните новое IEnumerable<int>.

То есть ваш foreach не извлекает элементы из Enum() - он извлекает элементы из нового IEnumerable<int>, созданного оператором linq.

Кроме того, ваш foreach работает в главном потоке, поэтому работа над каждым элементом однопоточна.

Если вы хотите работать параллельно, но выдавать предмет только тогда, когда это необходимо, попробуйте:

Parallel.ForEach( SingleItemPartitioner.Create( Enum() ), ( i, state ) =>
    {
        Thread.Sleep( 3000 );
        state.Break();
    }
0 голосов
/ 09 декабря 2011

Найден обходной путь.

Сначала позвольте мне уточнить исходный вопрос. Мне нужен приостановленный конвейер, который работает в бесконечной последовательности. Трубопровод это:

  1. читать из последовательности синхронно: Enum()
  2. параллельно обрабатывать элементы: AsParallel().Select(a => a)
  3. продолжить синхронную обработку: foreach body

Шаг 3 может приостановить конвейер. Это подражает Sleep(). Проблема в том, что шаг 2 извлекает слишком много элементов вперед, когда пиплейн приостановлен У PLinq должна быть какая-то внутренняя очередь. Размер очереди не может быть настроен явно. Размер зависит от ParallelMergeOptions. ParallelMergeOptions.NotBuffered уменьшает размер очереди, но размер все еще слишком велик для меня.

Мой обходной путь - узнать, сколько элементов обрабатывается, остановить параллельную обработку при достижении предела, возобновить параллельную обработку при повторном запуске конвейера.

int sourceCounter;

IEnumerable<int> SourceEnum() // infinite input sequence
{
    while (true)
        yield return Interlocked.Increment(ref sourceCounter);
}

[TestMethod]
public void PlainPLinq_PausedConsumtionTest()
{
    sourceCounter = 0;
    foreach (var i in SourceEnum().AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered).Select(a => a))
    {
        Thread.Sleep(3000);
        break;
    }
    Console.WriteLine("fetched from source sequence: {0}", sourceCounter); // prints 4544 on my machine
}

[TestMethod]
public void MyParallelSelect_NormalConsumtionTest()
{
    sourceCounter = 0;
    foreach (var i in MyParallelSelect(SourceEnum(), 64, a => a))
    {
        if (sourceCounter > 1000000)
            break;
    }
    Console.WriteLine("fetched from source sequence: {0}", sourceCounter);
}

[TestMethod]
public void MyParallelSelect_PausedConsumtionTest()
{
    sourceCounter = 0;
    foreach (var i in MyParallelSelect(SourceEnum(), 64, a => a))
    {
        Thread.Sleep(3000);
        break;
    }
    Console.WriteLine("fetched from source sequence: {0}", sourceCounter);
}

class DataHolder<D> // reference type to store class or struct D
{
    public D Data;
}

static IEnumerable<DataHolder<T>> FetchSourceItems<T>(IEnumerator<T> sourceEnumerator, DataHolder<int> itemsBeingProcessed, int queueSize)
{
    for (; ; )
    {
        var holder = new DataHolder<T>();
        if (Interlocked.Increment(ref itemsBeingProcessed.Data) > queueSize)
        {
            // many enought items are already being processed - stop feeding parallel processing
            Interlocked.Decrement(ref itemsBeingProcessed.Data);
            yield break;
        }
        if (sourceEnumerator.MoveNext())
        {
            holder.Data = sourceEnumerator.Current;
            yield return holder;
        }
        else
        {
            yield return null; // return null DataHolder to indicate EOF
            yield break;
        }
    }
}

IEnumerable<OutT> MyParallelSelect<T, OutT>(IEnumerable<T> source, int queueSize, Func<T, OutT> selector)
{
    var itemsBeingProcessed = new DataHolder<int>();
    using (var sourceEnumerator = source.GetEnumerator())
    {
        for (;;) // restart parallel processing
        {
            foreach (var outData in FetchSourceItems(sourceEnumerator, itemsBeingProcessed, queueSize).AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered).Select(
                inData => inData != null ? new DataHolder<OutT> { Data = selector(inData.Data) } : null))
            {
                Interlocked.Decrement(ref itemsBeingProcessed.Data);
                if (outData == null)
                    yield break; // EOF reached
                yield return outData.Data;
            }
        }
    }
}
...