Вариант использования: Я пытаюсь реализовать процедуру сохранения в javascript, используя Rx JS. У меня есть метод, названный для примера, и в нем я хочу сначала сохранить объект в локальной БД, а затем также отправить его на сервер. Так, например, у меня есть этот код:
set({ obj, group = 'main' }) {
// tell the LocalDB to store obj in appropriate name and group (appropriate table)
const storageObservable = this.parealDB.storage.set(this.name, group, { ...obj, status: 0 }); // status 0 means that the records need to be synced (sent) to the server
// tell the WebClient to sync obj to appropriate name and group (appropriate table)
const webclientObservable = this.parealDB.webclient.set(this.name, group, obj); // try to send to server and then set status=1 on the localDB / this request should not be failed
storageObservable.pipe(
catchError(error => of({
action: 'STORAGE_ERROR',
data: error,
}))
);
webclientObservable.pipe(
tap(result => this.parealDB.storage.set(this.name, group, { _id: obj._id, status: 1 })),
catchError(error => of({
action: 'WEBCLIENT_ERROR',
data: error,
}))
);
return merge(storageObservable, webclientObservable);
}
И я хочу вернуть объединенную наблюдаемую вне метода set и при обнаружении ошибки каждого внутреннего задания, также продолжить другие.
Проблема: Я проверил эту идею с некоторыми наблюдаемыми наблюдениями. Во-первых, взгляните на пример:
//emit every 2.5 seconds
const first = interval(200).pipe(
map(val => "" + val + " A"),
take(3)
);
//emit every 2 seconds
const second = interval(100).pipe(
mergeMap(val => {if(val === 1) return throwError('salam'); return of('' + val + " B");}),
// tap(val => console.log('tapped ', val)),
take(3)
)
// .pipe(
// onErrorResumeNext(),
// )
//emit every 1.5 seconds
const third = interval(300).pipe(
map(val => val + " C"),
take(3)
);
merge(first, second, third)
// .pipe(
// catchError(error => of(error)),
// )
.subscribe({
next: console.log,
error: console.error,
complete: () => console.log('completed')
});
Теперь, когда второй сбой, он разбивает всю работу и вывод выглядит так:
0 B
0 A
salam // (error)
И если я попытаюсь раскомментировать канал с ошибкой перехвата в объединенной наблюдаемой, он поместит salam как обычный вывод, а также завершит задание, но все равно замкнет его: внутреннее наблюдение, прежде чем объединить их вместе. Таким образом, раскомментирование упомянутого оператора со второй наблюдаемой (что создает проблему) приводит к следующему выводу:
0 B
0 A
0 C
1 A
2 A
1 C
2 C
completed
Это очень близко к тому, что я ищу, но из метода set я не смог ловить ошибки (слушать ошибки)!
Вопрос: Так что, я скучаю по использованию наблюдаемых Rx JS или есть лучший способ справиться с этой ситуацией?