В Rx JS (ES6) Я пытаюсь получить результат в последовательной последовательности операций в одной наблюдаемой.
Я не уверен, должен ли я использовать forkJoin (но Я хотел бы, чтобы операции выполнялись последовательно) или оператор concat (но я хотел бы просто получить уведомление в конце, когда все они будут выполнены).
I TRIED:
forkJoin
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
map(products => {
if ( !products ) {
return of(true);
}
const batch: Observable<any>[] = [];
for ( const product of products ) {
if ( product.toBeSync ) {
batch.push(this.api.updateProduct(product));
}
}
return forkJoin(batch);
})
);
}
concat
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
map(products => {
if ( !products ) {
return of(true);
}
const batch: Observable<any>[] = [];
for ( const product of products ) {
if ( product.toBeSync ) {
batch.push(this.api.updateProduct(product));
}
}
return concat(batch);
})
);
}
В обоих случаях я не могу видеть, как наблюдаемые из HttpClient
наблюдения делают их задание (запрос http не отправлен), но я вижу журнал.
Вызванный метод в пакете выглядит следующим образом:
updateProduct(product: Product) {
console.log('calling...');
const options = this.getDefaultRequestOptions();
return this.http.post('update_product', product, options);
}
Я вызываю функцию syn c () например:
this.productService.sync().subscribe(() => {
console.log('sync done');
}, error => this.utils.handleError(error));
Вывод:
(8) calling...
sync done
, но не запускается HTTP-запрос.
Если я делаю то же самое вне forkJoin / concat в карте каналов я вижу отправленный HTTP-запрос.
sync(): Observable<any> {
const product = new Product(null, 'Title', 'Desc.', 'CODE');
return this.api.updateProduct(product);
}
Чего мне не хватает?
--- ОБНОВЛЕНИЕ - РЕШЕНИЕ --- * 104 0 *
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
flatMap(products => {
if ( !products ) {
return of(true);
}
const batch: Observable<any>[] = [];
for ( const product of products ) {
if ( product.toBeSync ) {
batch.push(this.api.updateProduct(product));
}
}
console.log(batch);
return concat(...batch);
// return forkJoin(batch);
})
);