В RxJS карта не выполняет наблюдаемое внутреннее отображение - PullRequest
0 голосов
/ 18 января 2019

Новое в RxJS, но я пытаюсь отобразить один элементный поток в другой, который создает массив после , все внутренние / последующие потоки завершены / загружены. Тем не менее, мои внутренние наблюдаемые, похоже, не выполняются. Они просто возвращаются холодными.

Высокий уровень, мне нужно выполнить http-пост для загрузки списка файлов (в двух разных массивах на две разные конечные точки). Так как они большие, я эмулирую с задержкой в ​​5 секунд. Запросы должны выполняться параллельно, но ограничиваться одновременным выполнением X одновременно (здесь 2). Все это должно быть внутри канала, и канал должен позволять потоку продолжаться после завершения всех сообщений.

https://stackblitz.com/edit/rxjs-pnwa1b

import { map, mapTo, mergeMap, mergeAll, delay, tap, catchError, toArray } from 'rxjs/operators';
import { interval, merge, forkJoin, of, from, range, Observable } from 'rxjs';

const single = "name";

const first = ["abc", "def"];

const second = of("ghi", "jkl", "mno");

of(single)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(
  map(claim => 
    merge(
      from(first).pipe(map(photo => of(photo).pipe(delay(5000)))),
      from(second).pipe(map(video => of(video).pipe(delay(5000))))
    )
    .pipe(
      mergeAll(2)
    )
    .pipe(tap(val => console.log(`emit:${val}`)))
    .pipe(toArray())
    .pipe(tap(val => console.log(`emit:${val}`)))
  )
)
.pipe(
catchError(error => {
  console.log("error");
  return Observable.throw(error);
})
)
.subscribe(val => console.log(`final:${val}`));

Внутренняя подписка не будет ждать, пока они не будут завершены. Использование forkJoin не позволило бы мне ограничить количество одновременных загрузок. Как мне это сделать?

Обновление:

Ответ @dmcgrandle был очень полезным и побудил меня внести изменения, которые работают ниже:

import { map, mapTo, mergeMap, mergeAll, delay, tap, catchError, toArray } from 'rxjs/operators';
import { interval, merge, forkJoin, of, from, range, Observable, throwError } from 'rxjs';

const single = "name";

const first = ["abc", "def"];

const second = of("ghi", "jkl", "mno");

of(single)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(
  mergeMap(claim =>
    merge(
      from(first).pipe(map(photo => of(photo).pipe(delay(5000)).pipe(tap(val => console.log(`emit:${val}`))))),
      from(second).pipe(map(video => of(video).pipe(delay(5000)).pipe(tap(val => console.log(`emit:${val}`)))))
    )
  ),
  mergeAll(2),
  toArray()
)
.pipe(
catchError(error => {
  console.log("error");
  return throwError(error);
})
)
.subscribe(val => console.log(`final:${val}`));

1 Ответ

0 голосов
/ 19 января 2019

Если я вас правильно понимаю, то думаю, что это решение. Ваша проблема была с первым map, который не будет выполнять внутреннюю подписку, а просто преобразует поток в Observables of Observables, который, кажется, не соответствует вашим ожиданиям. Вместо этого я использовал mergeMap там.

Внутри from я использовал concatMap, чтобы заставить каждую эмиссию из first и second происходить по порядку и ждать, пока одна из них завершится, прежде чем начнется другая. Я также настроил postToEndpoint функции, которые возвращают Observables, чтобы быть ближе к тому, как ваш реальный код, вероятно, будет выглядеть.

StackBlitz Демо

код:

import { mergeMap, concatMap, delay, tap, catchError, toArray } from 'rxjs/operators';
import { merge, of, from, concat, throwError } from 'rxjs';

const single = "name";

const first = ["abc", "def"];

const second = of("ghi", "jkl", "mno");

const postToEndpoint1$ = photo => of(photo).pipe(
  tap(data => console.log('start of postTo1 for photo:', photo)),
  delay(5000),
  tap(data => console.log('end of postTo1 for photo:', photo))
);

const postToEndpoint2$ = video => of(video).pipe(
  tap(data => console.log('start of postTo2 for video:', video)),
  delay(5000),
  tap(data => console.log('end of postTo2 for video:', video))
);

of(single).pipe(
  tap(val => console.log(`initial emit:${val}`)),
  mergeMap(claim => 
    merge(
      from(first).pipe(concatMap(postToEndpoint1$)),
      from(second).pipe(concatMap(postToEndpoint2$))
      )
  ),
  toArray(),
  catchError(error => {
    console.log("error");
    return throwError(error);
  })
).subscribe(val => console.log(`final:`, val));

Надеюсь, это поможет.

...