Подводные камни в попытке использовать PLINQ над длительно работающими генераторами? - PullRequest
6 голосов
/ 25 января 2012

У меня есть несколько методов бесконечного генератора, в том числе некоторые длительно и бесконечно долго работающие генераторы.

IEnumerable<T> ExampleOne() { 
    while(true) // this one blocks for a few seconds at a time
        yield return LongRunningFunction();
}
IEnumerable<T> ExampleTwo() { 
    while(true) //this one blocks for a really long time
        yield return OtherLongRunningFunction();
}

Моя цель - создать бесконечную последовательность, которая объединяет элементы из двух примеров.Вот что я попробовал, используя PLINQ:

 IEnumerable<T> combined =  new[] { ExampleOne(), ExampleTwo() }
           .AsParallel()
           .WithMergeOptions(ParallelMergeOptions.NotBuffered)
           .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
           .SelectMany(source => source.GetRequests());

Кажется, что это правильно объединяет два IEnumerables в новый, с элементами из IEnumerable # 1 и # 2, доступными всякий раз, когда они появляются в любом из двухsource IEnumerables:

//assuming ExampleTwo yields TWO but happens roughly 5 times 
//less often then ExampleOne
Example output:  one one one one one TWO one one one one one one TWO

Однако, кажется, что иногда (обычно после многих часов пробега) OtherLongRunningFunction() будет идти в течение длительного периода времени, не возвращаясь, и подусловия, которые трудно воспроизвести, последовательность combined будет блокировать ее, а не продолжать возвращать результаты первого LongRunningFunction.Кажется, что хотя комбинированный параллельный запрос начинался с двух потоков, он решил переключиться на один поток позже.

Моя первая мысль была: «Это, вероятно, работа для RX Observable.Merge, а не для PLINQ."Но я был бы признателен за оба ответа, которые показывают правильные альтернативные способы решения этой ситуации, а также объяснения о механике того, как PLINQ может изменять степень параллелизма через несколько часов после начала запроса.

Ответы [ 3 ]

2 голосов
/ 25 января 2012

Вот способ Rx сделать это, и он действительно использует Merge:

IObservable<T> LongRunningFunction()
{
    return Observable.Start(() => {
        // Calculate some stuff
        return blah;
    }, Scheduler.TaskPoolScheduler);
}

Observable.Merge(
    Observable.Defer(LongRunningFunction).Repeat(),
    Observable.Defer(OtherLongRunningFunction).Repeat(),
).Subscribe(x => {
    Console.WriteLine("An item: {0}", x);
});
1 голос
/ 27 сентября 2013

Что касается механики PLINQ:

Я сталкиваюсь с той же проблемой: у меня есть последовательность, элементы которой требуют неравного времени обработки, некоторые из которых длиннее на несколько порядков.Я испытываю недостаток потоков, гораздо более воспроизводимый на 8-ядерном процессоре, чем на 4-ядерном, хотя это может произойти и на 4-ядерном процессоре после многих часов обработки.Некоторые темы могут возобновить работу через некоторое время.Обратите внимание, что используется динамическое разбиение на блоки, как в примере.

Наблюдение: голодание, скорее всего, происходит при завершении последовательных очень долго выполняющихся рабочих элементов.

Тема MSDN Параллельные циклы проливает некоторый свет:

Будьте осторожны, если вы используете параллельные циклы с отдельными шагами, которые занимают несколько секунд или более.Это может происходить при нагрузках, связанных с вводом / выводом, а также при длительных вычислениях.Если циклы занимают много времени, вы можете столкнуться с неограниченным ростом рабочих потоков из-за эвристики для предотвращения истощения потоков, используемой логикой внедрения потоков класса .NET ThreadPool.Эта эвристика постоянно увеличивает количество рабочих потоков, когда рабочие элементы текущего пула выполняются в течение длительного периода времени.Мотивация заключается в добавлении большего количества потоков в случаях, когда все в пуле потоков заблокировано.К сожалению, если работа на самом деле продолжается, больше потоков может не соответствовать вашим ожиданиям..NET Framework не может различить эти две ситуации.

Я до сих пор не знаю деталей, но я думаю, что базовая эвристика ThreadPool не дает оснований для очень долго выполняющихся рабочих элементовне в состоянии разместить потоки для следующих итераций из-за некоего правильно адаптированного верхнего предела, поэтому итерации стоят в очереди.У меня нет доступа Visual Studio к 8-ядерному компьютеру, где проблема легче воспроизводится.Я еще не смог воспроизвести проблему при отладке Visual Studio на 4-ядерном компьютере.Расследование продолжается.

Для получения более подробной информации, "Учитывает ли параллельная задача библиотека (или PLINQ) другие процессы?" тема весьма актуальна.

1 голос
/ 25 января 2012

Если вам нужны преимущества TPL, особенно для задач с различными нагрузками (что произойдет, когда будут созданы ваши блоки подписки и несколько элементов - прекратите ли вы давать элементы?), Я рекомендую TPL DataFlow .

Если вы хотите сделать это с Rx, для действительно длительных вычислительных задач лучше не блокировать пул потоков:

var stream = Observable.Merge(ExampleTwo().ToObservable(Scheduler.NewThread), ExampleOne().ToObservable(Scheduler.NewThread));

stream.Subscribe(...);
...