Перехват ошибок на объединенной наблюдаемой и продолжение остальных в RxJS - PullRequest
1 голос
/ 23 февраля 2020

Вариант использования: Я пытаюсь реализовать процедуру сохранения в javascript, используя Rx JS. У меня есть метод, названный для примера, и в нем я хочу сначала сохранить объект в локальной БД, а затем также отправить его на сервер. Так, например, у меня есть этот код:

set({ obj, group = 'main' }) {
    // tell the LocalDB to store obj in appropriate name and group (appropriate table)
    const storageObservable = this.parealDB.storage.set(this.name, group, { ...obj, status: 0 }); // status 0 means that the records need to be synced (sent) to the server
    // tell the WebClient to sync obj to appropriate name and group (appropriate table)
    const webclientObservable = this.parealDB.webclient.set(this.name, group, obj); // try to send to server and then set status=1 on the localDB / this request should not be failed

    storageObservable.pipe(
        catchError(error => of({
            action: 'STORAGE_ERROR',
            data: error,
        }))
    );

    webclientObservable.pipe(
        tap(result => this.parealDB.storage.set(this.name, group, { _id: obj._id, status: 1 })),
        catchError(error => of({
            action: 'WEBCLIENT_ERROR',
            data: error,
        }))
    );

    return merge(storageObservable, webclientObservable);
}

И я хочу вернуть объединенную наблюдаемую вне метода set и при обнаружении ошибки каждого внутреннего задания, также продолжить другие.

Проблема: Я проверил эту идею с некоторыми наблюдаемыми наблюдениями. Во-первых, взгляните на пример:

//emit every 2.5 seconds
const first = interval(200).pipe(
  map(val => "" + val + " A"),
  take(3)
);
//emit every 2 seconds
const second = interval(100).pipe(
  mergeMap(val => {if(val === 1) return throwError('salam'); return of('' + val + " B");}),
  // tap(val => console.log('tapped ', val)),
  take(3)
)
// .pipe(
//   onErrorResumeNext(),
// )

//emit every 1.5 seconds
const third = interval(300).pipe(
  map(val => val + " C"),
  take(3)
);

merge(first, second, third)
// .pipe(
//   catchError(error => of(error)),
// )
.subscribe({
  next: console.log,
  error: console.error,
  complete: () => console.log('completed')
});

Теперь, когда второй сбой, он разбивает всю работу и вывод выглядит так:

0 B
0 A
salam // (error)

И если я попытаюсь раскомментировать канал с ошибкой перехвата в объединенной наблюдаемой, он поместит salam как обычный вывод, а также завершит задание, но все равно замкнет его: внутреннее наблюдение, прежде чем объединить их вместе. Таким образом, раскомментирование упомянутого оператора со второй наблюдаемой (что создает проблему) приводит к следующему выводу:

0 B
0 A
0 C
1 A
2 A
1 C
2 C
completed

Это очень близко к тому, что я ищу, но из метода set я не смог ловить ошибки (слушать ошибки)!

Вопрос: Так что, я скучаю по использованию наблюдаемых Rx JS или есть лучший способ справиться с этой ситуацией?

1 Ответ

0 голосов
/ 23 февраля 2020

Вы можете поддерживать поток живым, но используя оператор repeat (), но используйте его осторожно, вы должны убедиться, что исходный поток является горячо наблюдаемым, или вы можете столкнуться с бесконечным циклом.

streamA.pipe(
catch(e=>of(e)),
repeat())
...