Агрегирование результата ForkJoin в Rx - PullRequest
3 голосов
/ 08 декабря 2010

Учитывая этот кусок кода:

var loadAll =
   Observable.ForkJoin(
      service1.FindBooksAsObservable().Select(s => s),
      service2.FindBooksAsObservable().Select(s => s),
      service3.FindBooksAsObservable().Select(s => s)
);

loadAll.Subscribe(
   result =>
   {
      var aggregatedListOfBooks = result.SelectMany(b => b);
   });

Как видите, проблема заключается в том, что каждый метод FindBooksAsObservable () возвращает IObservable<IEnumerable<Book>>, поэтому переменная result в Subscribe () Массив IEnumerable<Book>.

Есть ли другой способ агрегирования результата ForkJoin () ? Я надеялся использовать что-то вроде Merge () вместе с ForkJoin.

Ответы [ 2 ]

4 голосов
/ 08 декабря 2010

Предполагая, что все три службы возвращают список Books, вы можете использовать SelectMany для объединения списков:

IObservable<Book> loadAll = 
    Observable.ForkJoin(
        service1.FindBooksAsObservable().Select(s => s),
        service2.FindBooksAsObservable().Select(s => s),
        service3.FindBooksAsObservable().Select(s => s)
    )
    .Select(books => books.SelectMany(list => list).ToList());

loadAll.Subscribe(
    book => { /* will be called once with a single list of all items */ });

Вы можете удалить вызов ToList(), если вы не хотите, чтобы вывод был списком.

0 голосов
/ 12 июня 2013

Observable.ForkJoin не в последней стабильной версии Reactive Extensions (Rx) (v2.1.30214.0).ForkJoin по состоянию на июнь 2013 года, только в экспериментальных версиях Rx.

Дейв Секстон предложил обойти: http://social.msdn.microsoft.com/Forums/en-US/rx/thread/3cfccb74-9ce3-47dc-94fd-cf60270c1ed5

...