Как разделить поток и рекомбинировать подпотоки, чтобы получить окончательные результаты в RxJs - PullRequest
0 голосов
/ 11 июля 2019

У меня есть исходный поток, который может отправлять два типа сообщений. Я хотел бы разделить их на два отдельных потока и, как только исходный поток завершится, повторно объединить их окончательное значение излучения (или получить неопределенное значение, если его не существует).

* 1003 например *

const split1$ = source$.pipe(
     filter(m) => m.kind === 1, 
     mergeMap(m) => someProcessing1());
const split2$ = source$.pipe(
     filter(m) => m.kind === 2, 
     mergeMap(m) => someProcessing2());
forkJoin(split1$, split2$).subscribe(
(output1, output2) => console.log(output1, output2));

Проблема в том, что ничто не гарантирует, что и split1 $, и split2 $ будут выдавать значения. Если это произойдет, forkJoin никогда не излучает. Чем я могу заменить forkJoin для выдачи значения всякий раз, когда исходный поток завершается.

1 Ответ

1 голос
/ 11 июля 2019

О разделении потока: https://www.learnrxjs.io/operators/transformation/partition.html

По поводу "emit on complete", не могли бы вы вместо этого просто использовать complete callback? .subscribe(() => console.log('Emitted"), null, () => console.log('Completed'));

В противном случае вы можете использовать оператор startWith, чтобы убедиться, что что-то было отправлено.

const [evens, odds] = source.pipe(partition(val => val % 2 === 0));
evens = evens.pipe(startWith(undefined)); // This will emit undefined before everything, so forkJoin will surely emit

Добавьте startWith в конструкторе forkJoin:

forkJoin(evens.pipe(startWith(undefined)), odds.pipe(startWith(undefined)))
  .subscribe(console.log))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...