Новое в RxJS, но я пытаюсь отобразить один элементный поток в другой, который создает массив после , все внутренние / последующие потоки завершены / загружены. Тем не менее, мои внутренние наблюдаемые, похоже, не выполняются. Они просто возвращаются холодными.
Высокий уровень, мне нужно выполнить http-пост для загрузки списка файлов (в двух разных массивах на две разные конечные точки). Так как они большие, я эмулирую с задержкой в 5 секунд. Запросы должны выполняться параллельно, но ограничиваться одновременным выполнением X одновременно (здесь 2). Все это должно быть внутри канала, и канал должен позволять потоку продолжаться после завершения всех сообщений.
https://stackblitz.com/edit/rxjs-pnwa1b
import { map, mapTo, mergeMap, mergeAll, delay, tap, catchError, toArray } from 'rxjs/operators';
import { interval, merge, forkJoin, of, from, range, Observable } from 'rxjs';
const single = "name";
const first = ["abc", "def"];
const second = of("ghi", "jkl", "mno");
of(single)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(
map(claim =>
merge(
from(first).pipe(map(photo => of(photo).pipe(delay(5000)))),
from(second).pipe(map(video => of(video).pipe(delay(5000))))
)
.pipe(
mergeAll(2)
)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(toArray())
.pipe(tap(val => console.log(`emit:${val}`)))
)
)
.pipe(
catchError(error => {
console.log("error");
return Observable.throw(error);
})
)
.subscribe(val => console.log(`final:${val}`));
Внутренняя подписка не будет ждать, пока они не будут завершены. Использование forkJoin не позволило бы мне ограничить количество одновременных загрузок. Как мне это сделать?
Обновление:
Ответ @dmcgrandle был очень полезным и побудил меня внести изменения, которые работают ниже:
import { map, mapTo, mergeMap, mergeAll, delay, tap, catchError, toArray } from 'rxjs/operators';
import { interval, merge, forkJoin, of, from, range, Observable, throwError } from 'rxjs';
const single = "name";
const first = ["abc", "def"];
const second = of("ghi", "jkl", "mno");
of(single)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(
mergeMap(claim =>
merge(
from(first).pipe(map(photo => of(photo).pipe(delay(5000)).pipe(tap(val => console.log(`emit:${val}`))))),
from(second).pipe(map(video => of(video).pipe(delay(5000)).pipe(tap(val => console.log(`emit:${val}`)))))
)
),
mergeAll(2),
toArray()
)
.pipe(
catchError(error => {
console.log("error");
return throwError(error);
})
)
.subscribe(val => console.log(`final:${val}`));