Подписка срабатывает только один раз после ошибки - PullRequest
0 голосов
/ 26 мая 2020

Я ладья ie в rx js, и я пытаюсь выяснить, что произошло, в следующем коде.

Действительно, чтобы проиллюстрировать идею, я подготовил два простые обещания (resolvePromise и rejectPromise), одно разрешает обещание, а другое отклоняет. Экземпляр «Subject» используется для запуска этих двух обещаний (передаваемых по конвейеру с помощью mergeMap и forkJoin). В конце добавляются две отдельные инструкции (источник $ .next ('xxx')) в надежде активировать подписку дважды.

Но, наконец, только первый «source $ .next ('1') "запускает подписку, а следующий" source $ .next ('2') ", кажется, ничего не делает.

Я предполагаю, что такое поведение вызвано обещанием отклонить, которое буквально« выдает исключение ». Но мне интересно, как я могу исправить этот код, чтобы второй «source $ .next ('2')» также запускал подписку.

Большое спасибо

import { of,forkJoin,Subject } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

const resolvePromise = val =>
  new Promise(resolve => resolve(`resolve value: ${val}`));

const rejectPromise = err =>
  Promise.reject(`reject error: ${err}`);

const source$ = new Subject();

source$
  .pipe(mergeMap(val => {
    return forkJoin([
      resolvePromise(val), 
      rejectPromise(val)
      ]);
  }))
  .subscribe(
     console.log, 
     console.error
  );

  source$.next('1');
  source$.next('2');

Обновление

Основываясь на предложении Андрея Гатея ниже, чтобы решить эту проблему, я решил передать catchError после forkJoin и поместить обработку ошибок внутри.

source$.pipe(
  mergeMap(val => {
    return forkJoin([
      resolvePromise(val),
      rejectPromise(val)
    ])
      .pipe(
        catchError(err => {
          // your error handling business logic
          console.error(err);
          return empty();
        })
      );
  }))
  .subscribe(console.log);

Ответы [ 2 ]

1 голос
/ 27 мая 2020

отклонить обещание, которое буквально «выдает исключение»

Я бы сказал, что вы правы, судя по тому, как Rx JS обрабатывает обещания внутри:

promise.then(
  (value) => {
    if (!subscriber.closed) {
      subscriber.next(value);
      subscriber.complete();
    }
  },
  (err: any) => subscriber.error(err) // !
)

Источник .

Также стоит упомянуть, что, как вы можете видеть из приведенного выше фрагмента, как только обещание разрешается в первый раз, это разрешается будет передано значение и уведомление complete.

Часть complete очень важна для оператора forkJoin.

forkJoin подписывается на все предоставленные наблюдаемые и ждет , пока все не будет завершено. Он отправит массив со значениями , только если наблюдаемые будут иметь испускаемые в хотя бы один раз .

Когда один наблюдаемый испускает error forkJoin немедленно отправит это error уведомление дальше в цепочке.


Вот мой подход:

ource$
  .pipe(mergeMap(val => {
    return forkJoin([
      resolvePromise(val), 
      from(rejectPromise(val)).pipe(catchError(err => of(err)))
      ]);
  }))
  .subscribe(
     console.log, 
     console.error
  );

Мы используем from, чтобы мы может перехватить эту ошибку обещания на ранней стадии, что позволяет нам использовать catchError и использовать наблюдаемый объект, который отправит значение, а затем complete уведомление (of(err)).

0 голосов
/ 27 мая 2020

необходимо повторно подписаться через retry или repeat.

import { of,forkJoin,Subject } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

const resolvePromise = val =>
  new Promise(resolve => resolve(`resolve value: ${val}`));

const rejectPromise = err =>
  Promise.reject(`reject error: ${err}`);

const source$ = new Subject();

source$.pipe(
  mergeMap(val => forkJoin([
    resolvePromise(val), 
    rejectPromise(val)
  ])),
  catchError(() => EMPTY),
  repeat(),
).subscribe(
  console.log, 
  console.error
);

source$.next('1');
source$.next('2');
...