RxJS Наблюдаемый выброс при завершении предыдущего - PullRequest
0 голосов
/ 06 июня 2018

У меня есть динамический массив объектов, которые мне нужно обрабатывать синхронно.Я буду знать только количество объектов в массиве во время выполнения.Обработка каждого объекта состоит из вызова метода, который будет генерировать уведомления о ходе выполнения по завершении своей работы.Время обработки для каждого объекта будет значительно варьироваться (от 30-40 секунд до ~ 5 минут).

RxJS версия 5.5.6

Так что в основном мне нужно:

  1. Объект Emit 1
  2. Объект процесса 1
  3. Объект Emit 2
  4. Объект процесса 2

(продолжить для всех n объектов в массиве.

Я не знаю, как заставить наблюдаемое ожидание испустить следующий объект, пока не будет вызван обратный вызов complete предыдущего элемента.

Я посмотрел на группу операторов, и ближайший из них - merge, но он все еще обрабатывает все сайты одновременно.Вот где сейчас находится мой код:

В моем сервисе:

const allObs: Array<Observable<string>> = [];
const siteObserver: Observer<string> = {
  next: (s: string) => console.log(s),
  error: (error: string) => {},
  complete: () => console.log('complete')
};
activeSites.map((site: MatchSite) => {
  const obs: Observable<string> = Observable.create((masterObserver: Observer<string>) => {
    site.process().subscribe((x: string) => console.log(x));
    masterObserver.next(site.tag + ' processing');
  });
  allObs.push(obs);
});
const all: any = Rx.Observable.merge(...allObs);
const sub: Subscription = all.subscribe(siteObserver);

В объекте сайта process выглядит следующим образом (на данный момент просто симуляция интервала / тайм-аута, чтобы попытаться получитьэто работает без усложнения реального кода обработки):

process(): Observable<string> {
    const obs: Observable<string> = Observable.create((observer: Observer<string>) => {
      const int: any = setInterval(() => {
        observer.next('downloading ' + this.tag);
      }, 500);
      setTimeout(() => {
        clearInterval(int);
        console.log('download done: ' + this.tag);
        observer.complete();
      }, 2000);
    });
    return obs;
  }

В двух словах, я вижу это:

...
downloading siteA
downloading siteB
downloading siteC
downloading siteD
downloading siteA
downloading siteB
downloading siteC
downloading siteD
...

Когда то, что я хочу, это:

...
downloading siteA
downloading siteA
downloading siteB
downloading siteB
downloading siteC
downloading siteC
downloading siteD
downloading siteD
...

Может кто-нибудь указать мне правильное направление?

Спасибо,

TTE

ОБНОВЛЕНИЕ

Когда я добавляю параллелизм от 1 до merge или использую concat первый сайт обрабатывается дважды, другие сайты не обрабатываются.Теперь я действительно запутался ...

Я не могу понять, как flatMap будет работать в этом сценарии, но я все еще работаю над этим.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...