Найден обходной путь.
Сначала позвольте мне уточнить исходный вопрос. Мне нужен приостановленный конвейер, который работает в бесконечной последовательности. Трубопровод это:
- читать из последовательности синхронно:
Enum()
- параллельно обрабатывать элементы:
AsParallel().Select(a => a)
- продолжить синхронную обработку:
foreach
body
Шаг 3 может приостановить конвейер. Это подражает Sleep()
. Проблема в том, что шаг 2 извлекает слишком много элементов вперед, когда пиплейн приостановлен
У PLinq должна быть какая-то внутренняя очередь. Размер очереди не может быть настроен явно. Размер зависит от ParallelMergeOptions
. ParallelMergeOptions.NotBuffered
уменьшает размер очереди, но размер все еще слишком велик для меня.
Мой обходной путь - узнать, сколько элементов обрабатывается, остановить параллельную обработку при достижении предела, возобновить параллельную обработку при повторном запуске конвейера.
int sourceCounter;
IEnumerable<int> SourceEnum() // infinite input sequence
{
while (true)
yield return Interlocked.Increment(ref sourceCounter);
}
[TestMethod]
public void PlainPLinq_PausedConsumtionTest()
{
sourceCounter = 0;
foreach (var i in SourceEnum().AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered).Select(a => a))
{
Thread.Sleep(3000);
break;
}
Console.WriteLine("fetched from source sequence: {0}", sourceCounter); // prints 4544 on my machine
}
[TestMethod]
public void MyParallelSelect_NormalConsumtionTest()
{
sourceCounter = 0;
foreach (var i in MyParallelSelect(SourceEnum(), 64, a => a))
{
if (sourceCounter > 1000000)
break;
}
Console.WriteLine("fetched from source sequence: {0}", sourceCounter);
}
[TestMethod]
public void MyParallelSelect_PausedConsumtionTest()
{
sourceCounter = 0;
foreach (var i in MyParallelSelect(SourceEnum(), 64, a => a))
{
Thread.Sleep(3000);
break;
}
Console.WriteLine("fetched from source sequence: {0}", sourceCounter);
}
class DataHolder<D> // reference type to store class or struct D
{
public D Data;
}
static IEnumerable<DataHolder<T>> FetchSourceItems<T>(IEnumerator<T> sourceEnumerator, DataHolder<int> itemsBeingProcessed, int queueSize)
{
for (; ; )
{
var holder = new DataHolder<T>();
if (Interlocked.Increment(ref itemsBeingProcessed.Data) > queueSize)
{
// many enought items are already being processed - stop feeding parallel processing
Interlocked.Decrement(ref itemsBeingProcessed.Data);
yield break;
}
if (sourceEnumerator.MoveNext())
{
holder.Data = sourceEnumerator.Current;
yield return holder;
}
else
{
yield return null; // return null DataHolder to indicate EOF
yield break;
}
}
}
IEnumerable<OutT> MyParallelSelect<T, OutT>(IEnumerable<T> source, int queueSize, Func<T, OutT> selector)
{
var itemsBeingProcessed = new DataHolder<int>();
using (var sourceEnumerator = source.GetEnumerator())
{
for (;;) // restart parallel processing
{
foreach (var outData in FetchSourceItems(sourceEnumerator, itemsBeingProcessed, queueSize).AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered).Select(
inData => inData != null ? new DataHolder<OutT> { Data = selector(inData.Data) } : null))
{
Interlocked.Decrement(ref itemsBeingProcessed.Data);
if (outData == null)
yield break; // EOF reached
yield return outData.Data;
}
}
}
}