GroupBy затем ObserveOn теряет элементы - PullRequest
1 голос
/ 13 июня 2011

Попробуйте это в LinqPad:

Observable
    .Range(0, 10)
    .GroupBy(x => x % 3)
    .ObserveOn(Scheduler.NewThread)
    .SelectMany(g => g.Select(x => g.Key + " " + x))
    .Dump()

Результаты явно недетерминированы, но в каждом случае мне не удается получить все 10 пунктов.Моя текущая теория состоит в том, что предметы проходят через сгруппированные наблюдаемые, ненаблюдаемые как маршалы конвейера в новый поток.

Ответы [ 2 ]

1 голос
/ 14 июня 2011

Linqpad не знает, что вы запускаете все эти потоки - он сразу достигает конца кода (помните, что операторы Rx не всегда действуют синхронно, это идея!), Ждет несколько миллисекунд,затем заканчивается удалением AppDomain и всех его потоков (которые еще не догнали).Попробуйте добавить Thread.Sleep до конца, чтобы дать новым потокам время наверстать упущенное.

Кроме того, Scheduler.NewThread - очень неэффективный планировщик, EventLoopScheduler (создать ровно один поток) или Scheduler.TaskPool (используйте пул TPL, как если бы вы создали задачу длякаждый элемент) гораздо более эффективны (конечно, в этом случае, поскольку у вас есть только 10 элементов, Планировщик является лучшим!)

0 голосов
/ 14 июня 2011

Похоже, что проблема заключается во времени между началом подписки на новую группу в операции GroupBy и задержкой внедрения новой подписки.Если вы увеличите количество итераций с 10 до 100, через некоторое время вы должны начать видеть некоторые результаты.

Кроме того, если вы измените GroupBy на .Where (x => x% 3 == 0), вы, вероятно, заметите, что никакие значения не будут потеряны, поскольку динамическая подписка на группы IObservable не должнаинициализировать новых наблюдателей.

...