Angular / RxJS: вложенные сервисные вызовы с наблюдаемыми - PullRequest
0 голосов
/ 14 сентября 2018

Я новичок в наблюдаемых RxJS и пытаюсь разрешить довольно простой вариант использования.

В службе я сначала выполняю вызов http, который возвращает элемент (как наблюдаемый).Элемент содержит массив идентификаторов, некоторые из которых повторяются.Для каждого отдельного идентификатора мне нужно вызвать другой сервис http (снова возвращает наблюдаемый) и добавить его возвращаемое значение к исходному элементу вместо соответствующего идентификатора.Эти вызовы должны происходить параллельно.Наконец, после завершения каждого звонка мой сервис должен возвращать наблюдаемую исходную вещь, теперь с ее подэлементами на месте.

Чтобы дать лучшую идею, это то, что будет выглядеть с обещаниями, а нечем наблюдаемые:

MyService() {
    return HttpGetMainItem()
        .then(item => {
            var promises = _.uniq(item.subItems)
                 .map(sid => HttpGetSubItem(sid)
                            .then(subItem => {
                                // add to matching item.subItems
                             }));
            // wait for all promises to complete and return main item
            return Promise.all(promises).then(() => item);
        });
}

Каков наилучший способ выполнить эту работу с наблюдаемыми?

РЕДАКТИРОВАТЬ: из ответов, кажется, я не очень ясно.Пример с обещаниями просто для ясности, в моем случае http-вызовы на самом деле являются HttpClient.get от Angular, поэтому они возвращают наблюдаемые - я ищу все с помощью наблюдаемых.

Ответы [ 3 ]

0 голосов
/ 14 сентября 2018

Я считаю, что вам нужно что-то вроде этого:

import { from, forkJoin } from 'rxjs';
import { switchMap } from 'rxjs/operators';

itemWithSubItems$ = from(HttpGetMainItem()).pipe(
    switchMap(item => {
        const promises = _.uniq(item.subItems).map(item => HttpGetSubItem(item.sid));
        return forkJoin(promises)
            .map(resultArray => item.subItems = resultArray)
            .map(_ => item);
    })
);

Сначала выбирает основной элемент.Затем используйте forkJoin, чтобы разрешить все подзапросы и обогатить основной элемент.После этого просто верните основной предмет.

0 голосов
/ 14 сентября 2018

Вот один из способов выполнить вышеизложенное с помощью rxjs.

  1. Преобразовать обещание в наблюдаемое с помощью from.
  2. Во внешнем канале вызовите switchMap, чтобыВы можете назвать другую наблюдаемую и вернуть ее как результат внешней наблюдаемой.Вы переключаете контексты выполнения.
  3. Внутри switchMap создайте карту подэлементов, как вы делали раньше, а затем используйте forkJoin, чтобы создать Observable для всех обещаний элементов.ForkJoin выдаст массив всех результатов, как только все обещания будут выполнены.Это как promise.all.
  4. Добавить элементы, которые вы планировали сделать ранее, и вернуть исходный элемент.

Код

from(HttpGetMainItem()).pipe(
    switchMap(item => 
        forkJoin(_.uniq(item.subItems).map(sid => HttpGetSubItem(sid)))
            .pipe(
                map(results => { 
                    /* add results to matching items.subItems and */
                    return item;
                })
            )
     )
);

Мне кажется, это выглядит немного неуклюже из-за необходимости сохранения корневого элемента и требуемой вложенности.Вы можете использовать параметр селектора switchMap, чтобы объединить внешнюю и внутреннюю наблюдаемую.Вы можете использовать этот параметр вместо логики, которую вы имели в map(), и поскольку результаты обеих наблюдаемых передаются, вам больше не понадобится дальнейшее вложение.

from(HttpGetMainItem()).pipe(
    switchMap(
        item => forkJoin(_.uniq(item.subItems).map(sid => HttpGetSubItem(sid))),
        (item, results) => { 
             /* add results to matching items.subItems and */
             return item;
        }
    )
);
0 голосов
/ 14 сентября 2018

Может быть, вы могли бы использовать такую ​​библиотеку, как async.each -> https://caolan.github.io/async/docs.html#each (возможно, каждая серия).

будет выглядеть примерно так:

async.each(array, (obj, cb) => {
  observable with obj in parameter and with subscriber result : 
  cb(err, subscriberRes);
}, (err, res) => {
  console.log(res);
}
...