Как объединить вызовы MergeMap Observables и вернуть только одно значение для всей наблюдаемой - PullRequest
4 голосов
/ 30 апреля 2019

У меня есть этот сценарий в машинописи / угловой с rxjs 6.5:

   main(){

        const properties = ['session', 'user'];
        const source: Observable<any> = from(properties);

        source
            .pipe(
                mergeMap(key => this.getKey().map((value) => ({key: key, value: value}))),
                tap((result) => {
                    // sending the result elsewhere;
                }),
            ).subscribe(
            (result) => {
                console.log('Final result ->', result);
            }
        );

        console.log('\n');

    }

    getKey(): Observable<any> {
        // Mock function that returns an observable that emits 1 value
        const observable = from(['test']);

        return observable;
    }

Вывод:

Final result -> {key: "session", value: "test"}
Final result -> {key: "user", value: "test"}

1-й вопрос: Как мне вернуть самым элегантным образом при подписке на источник только 1 значение с объединенными результатами внутренних наблюдаемых?

Мой требуемый вывод с подпиской таким образом (потому что я хочу, чтобы эта комбинированная операция была в конвейере), будет:

(...).subscribe(
(result) => {console.log('Final Result:', result}
)

OUTPUT:

Final result -> [{key: "session", value: "test"}, {key: "user", value: "test"}]

2-й вопрос Если меня не волнует результат внутренних наблюдаемых, как мне вернуть только 1 значение или как узнать, когда все внутренние наблюдаемые были завершены?

Заранее спасибо.

Ответы [ 4 ]

1 голос
/ 30 апреля 2019

Вот аннотированный пример, чтобы помочь прояснить ваши вопросы о процессе подписки, о котором вы спрашиваете.

1:

Как указано в другом ответе, оператор reduce - это то, что вы хотите включить в свой source конвейер. Ключевой деталью reduce является то, что он излучает только после завершения соответствующего наблюдаемого источника. Если вместо этого вы хотите, чтобы излучение завершилось, эти внутренние наблюдаемые завершены, тогда scan подходит. Другое отличие от последнего состоит в том, что он не требует завершения источника.

Q2:

С этим вопросом обратитесь к моему примеру ниже и представьте каждый аргумент конвейера обработки как время жизни одного запроса. Здесь завершение неявно. Это происходит после обработки последнего значения внутренних наблюдаемых.

Однако, если нет никаких границ с внутренними наблюдаемыми, то знать, когда все внутренние наблюдаемые полны, невозможно. В таком случае вы обнаружите, что reduce() не будет работать.

const { from, of, Subject } = rxjs;
const { mergeMap, map, tap, reduce, scan } = rxjs.operators;

// Use a subject to simulate processing.
// Think of each argument as a request to the processing pipeline below.
const properties = new Subject();

// Establish processing pipeline
const source = properties.pipe(
  // `mergeMap` here flattens the output value to the combined inner output values
  mergeMap(props =>
    // Each item inside the argument should be piped as separate values
    from(props).pipe(
      // `mergeMap` here flattens the output value to `{ key, value }`
      mergeMap(key =>
        of('test').pipe(
          map(value => ({ key, value })),
        ),
      ),
      // Unlike `scan`, `reduce` only emits upon "completion".
      // Here, "completion" is implicit - it is after the last
      // element of `from(props)` has been processed.
      reduce((a, i) => [...a, i], []),
    )
  ),  
);

// Subscribe to the pipeline to observe processing.
source.subscribe(console.log);

// Trigger a processing request with an argument
properties.next(['session', 'user']);

// Trigger another processing request
properties.next(['session', 'user']);
<script src="https://unpkg.com/rxjs@6.5.1/bundles/rxjs.umd.min.js"></script>
0 голосов
/ 30 апреля 2019

Q1: Вам нужен toArray - он объединит все значения вашего потока в один массив:

toArray example

Q2: Чтобы опустить все значения в потоке и выдать значение по завершении

concat(
  source$.pipe(ignoreElements()),
  of(true)
)

emit a value upon source completion

См. " Извлечь значение при завершении источника " пример на детской площадке

0 голосов
/ 30 апреля 2019

Используйте уменьшить

.pipe(
  reduce((results, result) => {
    results.push(result);
    return results;
  }, [])
)

Результирующая наблюдаемая будет излучаться только после того, как все остальные будут излучены, а полученное значение будет массивом всех результатов.

0 голосов
/ 30 апреля 2019

1-й вопрос, вы можете использовать scan для обработки и накопления вывода

 mergeMap(key => from(this.getKey())),
 scan((acc,curr) =>acc.concat([{key: curr.key, value: curr.value}]),[])),

2n Вопрос

используйте оператор first(), чтобы получить только один вывод из внутренней наблюдаемой прикрепите finalize() к внутренней наблюдаемой, которая будет активирована после завершения внутренней наблюдаемой. или используйте last(), чтобы получить последний накопленный результат

 mergeMap(key => from(this.getKey())),
 scan((acc,curr) =>acc.concat([{key: curr.key, value: curr.value}]),[])),
 first()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...