RxJs объединитьПоследний, не дожидаясь исходных наблюдений для излучения? - PullRequest
0 голосов
/ 04 января 2019

У меня есть две исходные наблюдаемые, из которых мне нужно вычислить некоторые данные, как только один исходная наблюдаемая излучает. Я пытаюсь использовать оператор combineAll(), но он выдает значение только тогда, когда каждый из исходных наблюдаемых объектов генерирует впервые.

Существует ли какой-либо оператор, похожий на combineAll(), который испускается, как только любой из исходных наблюдаемых объектов запускается впервые? Если нет, то какой самый ясный способ сделать это?

Что я пробовал:

const source1$ = service.getSomeData();
const source2$ = service.getOtherData();

combineLatest(
  source1$,
  source2$
).pipe(
  map([source1Data, source2Data] => {
    // this code only gets executed when both observables emits for the first time
    return source1Data + source2Data;
  })
)

Ответы [ 2 ]

0 голосов
/ 07 января 2019

Если я правильно понимаю, вам нужен шаблон, подобный следующей диаграмме:

stream1$ => ------ 1 ------ 12 -----------------------
stream2$ => ------------------------- 30 -------------

result$  => ------ 1 ------ 12 ------ 42 --------------

Если доступно одно значение, выдайте его. Если оба доступны, выведите комбинацию обоих, в этом случае простую сумму (12 + 30 = 42);

Сначала входные потоки, для этого примера я сделал их субъектами, чтобы мы могли передавать данные вручную:

const stream1$ = new Subject();
const stream2$ = new Subject();

Далее мы скомбинируем входные данные, сначала пройдя через оператор startWith. Это гарантирует, что combineLates производит наблюдаемую, которая испускает немедленно - [null, null], чтобы быть точным.

const combined$ = combineLatest(
  stream1$.pipe(startWith(null)),
  stream2$.pipe(startWith(null)),
);

Теперь у вас есть наблюдаемая, которая всегда испускает массивы длины 2, содержащие любую комбинацию ваших данных (числа в этом примере) и ноль, как на следующей диаграмме:

stream1$ | startWith(NULL) => NULL ----------- 1 ----------- 12 ----------------------------
stream2$ | startWith(NULL) => NULL ---------------------------------------- 30 -------------

combined$                     [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------

Наконец, вы можете просмотреть и map этот вывод в желаемом формате: сумма 2 чисел, если оба доступны, или первое доступное значение:

const processedCombinations$ = combined$.pipe(
  map(([data1, data2]) => {
    if (data1 === null) return data2;
    if (data2 === null) return data1;

    return data1 + data2;
  }),
);

Результат:

combined$                  => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
processedCombinations$     => NULL ----------- 1 ----------- 12 ----------- 42 -------------

Остается одна проблема: первое значение, выданное combined$, равно [null, null], из-за чего processedCombinations$ изначально испускает null. Один из способов исправить это - связать другую трубу с помощью skipWhile на processedCombinations$:

const final$ = processedCombinations$.pipe(skipWhile((input) => input === null));

Результат:

combined$                  => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
processedCombinations$     => NULL ----------- 1 ----------- 12 ----------- 42 -------------
final$                     => ---------------- 1 ----------- 12 ----------- 42 -------------

Другой, imo лучше, способ фильтровать поток combined$ до того, как из него будет создан processedCombinations$ (теперь фактически final$):

const combinedFiltered$ = combined$.pipe(
    filter(([first, second])=> first !== null || second !== null),
);

const final$ = combinedFiltered$.pipe(
    map(([data1, data2]) => {
        if (data1 === null) return data2;
        if (data2 === null) return data1;

        return data1 + data2;
    }),
);

Соответствующая диаграмма хорошо показывает, как нерелевантные значения удаляются как можно раньше в иерархии потока:

combined$                  => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
combinedFiltered$          => ---------------- [1, NULL] --- [12, NULL] --- [12, 30] -------
final$                     => ---------------- 1 ----------- 12 ----------- 42 -------------

Приведенные выше диаграммы могут быть получены с помощью этого кода:

final$.subscribe(console.log);

stream1$.next(1);
// logs: 1

stream1$.next(12);
// logs: 12

stream2$.next(30);
// logs: 42

Используемый импорт:

import { combineLatest, Subject } from 'rxjs';
import { filter, map, skipWhile, startWith } from 'rxjs/operators';
0 голосов
/ 04 января 2019

Одним из способов является префикс всех источников с startWith:

combineLatest(
  source1$.pipe(startWith(?)),
  source2$.pipe(startWith(?)),
)

что испускается, как только какая-либо из наблюдаемых исходных текстов испускается в первый раз?

Похоже, вы ищете race(source1$, source2$) наблюдаемый метод создания или, может быть, просто merge(source1$, source2$).pipe(take(1)). Но это действительно зависит от того, что вы хотите сделать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...