У меня есть несколько методов бесконечного генератора, в том числе некоторые длительно и бесконечно долго работающие генераторы.
IEnumerable<T> ExampleOne() {
while(true) // this one blocks for a few seconds at a time
yield return LongRunningFunction();
}
IEnumerable<T> ExampleTwo() {
while(true) //this one blocks for a really long time
yield return OtherLongRunningFunction();
}
Моя цель - создать бесконечную последовательность, которая объединяет элементы из двух примеров.Вот что я попробовал, используя PLINQ:
IEnumerable<T> combined = new[] { ExampleOne(), ExampleTwo() }
.AsParallel()
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.SelectMany(source => source.GetRequests());
Кажется, что это правильно объединяет два IEnumerables в новый, с элементами из IEnumerable
# 1 и # 2, доступными всякий раз, когда они появляются в любом из двухsource IEnumerables
:
//assuming ExampleTwo yields TWO but happens roughly 5 times
//less often then ExampleOne
Example output: one one one one one TWO one one one one one one TWO
Однако, кажется, что иногда (обычно после многих часов пробега) OtherLongRunningFunction()
будет идти в течение длительного периода времени, не возвращаясь, и подусловия, которые трудно воспроизвести, последовательность combined
будет блокировать ее, а не продолжать возвращать результаты первого LongRunningFunction
.Кажется, что хотя комбинированный параллельный запрос начинался с двух потоков, он решил переключиться на один поток позже.
Моя первая мысль была: «Это, вероятно, работа для RX Observable.Merge
, а не для PLINQ."Но я был бы признателен за оба ответа, которые показывают правильные альтернативные способы решения этой ситуации, а также объяснения о механике того, как PLINQ может изменять степень параллелизма через несколько часов после начала запроса.