У меня есть динамический массив объектов, которые мне нужно обрабатывать синхронно.Я буду знать только количество объектов в массиве во время выполнения.Обработка каждого объекта состоит из вызова метода, который будет генерировать уведомления о ходе выполнения по завершении своей работы.Время обработки для каждого объекта будет значительно варьироваться (от 30-40 секунд до ~ 5 минут).
RxJS версия 5.5.6
Так что в основном мне нужно:
- Объект Emit 1
- Объект процесса 1
- Объект Emit 2
- Объект процесса 2
(продолжить для всех n объектов в массиве.
Я не знаю, как заставить наблюдаемое ожидание испустить следующий объект, пока не будет вызван обратный вызов complete
предыдущего элемента.
Я посмотрел на группу операторов, и ближайший из них - merge
, но он все еще обрабатывает все сайты одновременно.Вот где сейчас находится мой код:
В моем сервисе:
const allObs: Array<Observable<string>> = [];
const siteObserver: Observer<string> = {
next: (s: string) => console.log(s),
error: (error: string) => {},
complete: () => console.log('complete')
};
activeSites.map((site: MatchSite) => {
const obs: Observable<string> = Observable.create((masterObserver: Observer<string>) => {
site.process().subscribe((x: string) => console.log(x));
masterObserver.next(site.tag + ' processing');
});
allObs.push(obs);
});
const all: any = Rx.Observable.merge(...allObs);
const sub: Subscription = all.subscribe(siteObserver);
В объекте сайта process
выглядит следующим образом (на данный момент просто симуляция интервала / тайм-аута, чтобы попытаться получитьэто работает без усложнения реального кода обработки):
process(): Observable<string> {
const obs: Observable<string> = Observable.create((observer: Observer<string>) => {
const int: any = setInterval(() => {
observer.next('downloading ' + this.tag);
}, 500);
setTimeout(() => {
clearInterval(int);
console.log('download done: ' + this.tag);
observer.complete();
}, 2000);
});
return obs;
}
В двух словах, я вижу это:
...
downloading siteA
downloading siteB
downloading siteC
downloading siteD
downloading siteA
downloading siteB
downloading siteC
downloading siteD
...
Когда то, что я хочу, это:
...
downloading siteA
downloading siteA
downloading siteB
downloading siteB
downloading siteC
downloading siteC
downloading siteD
downloading siteD
...
Может кто-нибудь указать мне правильное направление?
Спасибо,
TTE
ОБНОВЛЕНИЕ
Когда я добавляю параллелизм от 1 до merge
или использую concat
первый сайт обрабатывается дважды, другие сайты не обрабатываются.Теперь я действительно запутался ...
Я не могу понять, как flatMap
будет работать в этом сценарии, но я все еще работаю над этим.