Если я правильно понимаю, вам нужен шаблон, подобный следующей диаграмме:
stream1$ => ------ 1 ------ 12 -----------------------
stream2$ => ------------------------- 30 -------------
result$ => ------ 1 ------ 12 ------ 42 --------------
Если доступно одно значение, выдайте его. Если оба доступны, выведите комбинацию обоих, в этом случае простую сумму (12 + 30 = 42);
Сначала входные потоки, для этого примера я сделал их субъектами, чтобы мы могли передавать данные вручную:
const stream1$ = new Subject();
const stream2$ = new Subject();
Далее мы скомбинируем входные данные, сначала пройдя через оператор startWith. Это гарантирует, что combineLates производит наблюдаемую, которая испускает немедленно - [null, null]
, чтобы быть точным.
const combined$ = combineLatest(
stream1$.pipe(startWith(null)),
stream2$.pipe(startWith(null)),
);
Теперь у вас есть наблюдаемая, которая всегда испускает массивы длины 2, содержащие любую комбинацию ваших данных (числа в этом примере) и ноль, как на следующей диаграмме:
stream1$ | startWith(NULL) => NULL ----------- 1 ----------- 12 ----------------------------
stream2$ | startWith(NULL) => NULL ---------------------------------------- 30 -------------
combined$ [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
Наконец, вы можете просмотреть и map
этот вывод в желаемом формате: сумма 2 чисел, если оба доступны, или первое доступное значение:
const processedCombinations$ = combined$.pipe(
map(([data1, data2]) => {
if (data1 === null) return data2;
if (data2 === null) return data1;
return data1 + data2;
}),
);
Результат:
combined$ => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
processedCombinations$ => NULL ----------- 1 ----------- 12 ----------- 42 -------------
Остается одна проблема: первое значение, выданное combined$
, равно [null, null]
, из-за чего processedCombinations$
изначально испускает null
. Один из способов исправить это - связать другую трубу с помощью skipWhile
на processedCombinations$
:
const final$ = processedCombinations$.pipe(skipWhile((input) => input === null));
Результат:
combined$ => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
processedCombinations$ => NULL ----------- 1 ----------- 12 ----------- 42 -------------
final$ => ---------------- 1 ----------- 12 ----------- 42 -------------
Другой, imo лучше, способ фильтровать поток combined$
до того, как из него будет создан processedCombinations$
(теперь фактически final$
):
const combinedFiltered$ = combined$.pipe(
filter(([first, second])=> first !== null || second !== null),
);
const final$ = combinedFiltered$.pipe(
map(([data1, data2]) => {
if (data1 === null) return data2;
if (data2 === null) return data1;
return data1 + data2;
}),
);
Соответствующая диаграмма хорошо показывает, как нерелевантные значения удаляются как можно раньше в иерархии потока:
combined$ => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
combinedFiltered$ => ---------------- [1, NULL] --- [12, NULL] --- [12, 30] -------
final$ => ---------------- 1 ----------- 12 ----------- 42 -------------
Приведенные выше диаграммы могут быть получены с помощью этого кода:
final$.subscribe(console.log);
stream1$.next(1);
// logs: 1
stream1$.next(12);
// logs: 12
stream2$.next(30);
// logs: 42
Используемый импорт:
import { combineLatest, Subject } from 'rxjs';
import { filter, map, skipWhile, startWith } from 'rxjs/operators';