Несколько SelectMany в Rx - PullRequest
       27

Несколько SelectMany в Rx

0 голосов
/ 26 сентября 2011

У меня есть такой интерфейс:

interface IProcessor{
    IObservable<Item> Process(Item item);
}

У меня есть массив рабочих:

IProcessor[] _workers = ....

Я хочу передать предмет всем работникам:

var ret = Observable.Return(item);
for (var i = 0; i < _workers.Length; i++)
{
    int index = i;
    ret = ret
        .SelectMany(r => _workers[index].Process(r))
    ;
}
return ret;

Я не слишком доволен тем, как это выглядит - есть ли более чистый способ?

Ответы [ 2 ]

2 голосов
/ 27 сентября 2011

Это работает для меня:

IObservable<Item> ret = _workers.Aggregate(
    Observable.Return(item),
    (rs, w) =>
        from r in rs
        from p in w.Process(r)
        select p);

Пожалуйста, имейте в виду, что такого рода совокупность наблюдаемых - как в вашем вопросе, так и в моем ответе - может быстро вызвать проблемы с памятью (то есть переполнение стека). В моих тестах я мог заставить 400 рабочих работать, но 500 вызвали сбой.

Вам лучше изменить IProcessor, чтобы не использовать наблюдаемые, и реализовать свою наблюдаемую так:

interface IProcessor{
    Item Process(Item item);
}

var f =
    _workers.Aggregate<IProcessor, Func<Item, Item>>(
            i => i,
            (fs, p) => i => p.Process(fs(i)));

var ret = Observable.Start(() => f(item), Scheduler.ThreadPool);

При таком подходе я могу получить более 20 000 вложенных рабочих до переполнения стека, и результаты почти мгновенны до этого уровня.

0 голосов
/ 26 сентября 2011

Может быть, что-то вроде этого:?

var item = new Item();
_workers
  .ToObservable()
  .SelectMany(worker => worker.Process(item))
  .Subscribe(item => ...);

Я сделал предположение, что рабочие могут обрабатывать элемент параллельно.

PS Если вы хотите последовательную обработку, это будет

var item = new Item();
_workers
  .ToObservable()
  .Select(worker => worker.Process(item))
  .Concat()
  .Subscribe(item => ...);
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...