Это работает для меня:
IObservable<Item> ret = _workers.Aggregate(
Observable.Return(item),
(rs, w) =>
from r in rs
from p in w.Process(r)
select p);
Пожалуйста, имейте в виду, что такого рода совокупность наблюдаемых - как в вашем вопросе, так и в моем ответе - может быстро вызвать проблемы с памятью (то есть переполнение стека). В моих тестах я мог заставить 400 рабочих работать, но 500 вызвали сбой.
Вам лучше изменить IProcessor
, чтобы не использовать наблюдаемые, и реализовать свою наблюдаемую так:
interface IProcessor{
Item Process(Item item);
}
var f =
_workers.Aggregate<IProcessor, Func<Item, Item>>(
i => i,
(fs, p) => i => p.Process(fs(i)));
var ret = Observable.Start(() => f(item), Scheduler.ThreadPool);
При таком подходе я могу получить более 20 000 вложенных рабочих до переполнения стека, и результаты почти мгновенны до этого уровня.