Как связать наблюдаемые действия, чтобы испускать синхронно, а затем испускать конечное значение - PullRequest
1 голос
/ 10 января 2020

У меня есть метод, который принимает наблюдаемое в качестве параметра и возвращает другое наблюдаемое (если вы работаете с Redux-Observable, у него есть сигнатура этого типа function (action$: Observable<Action>): Observable<Action>;

В этом методе я необходимо, чтобы наблюдаемое сделало следующее, когда получило действие 'POST_AND_GET_REQUEST':

  1. Использует часть действия полезной нагрузки для синхронного выполнения запроса POST для каждого объекта в массиве
  2. После завершения этих синхронных вызовов выполните запрос GET
  3. Когда запрос GET вернет ответ, а затем верните 'POST_AND_GET_SUCCESS' действие

Я создал стекаблиц, чтобы проиллюстрировать, что У меня так далеко: https://stackblitz.com/edit/rxjs-a2pwb8?file=index.ts

Но вот и фрагмент кода:

import { from, of, Observable } from 'rxjs';
import { filter, delay, mapTo, mergeMap, concatMap, tap, switchMap } from 'rxjs/operators';

// GOAL
// synchronously post each period, when that's finished perform a get request
// postPeriods should only console the final POST_AND_GET_SUCCESS string that all was successful

interface ActionModel {
  type: string;
  payload: any;
}

const postPeriods = (action$: Observable<ActionModel>) => action$.pipe(
  filter(action => action.type === 'POST_AND_GET_REQUEST'),
  mergeMap(action =>
    from(action.payload).pipe(
      concatMap(period => of(`${period} post success`).pipe(
        delay(1000),
        tap(() => console.log(`${period} post success`)),
        // this is wrong - we only want this to emit once
        switchMap(response => of('get success').pipe(map => of('POST_AND_GET_SUCCESS')))
      ))
    ),
  )
);

const postRequestAction = from([{ type: 'POST_AND_GET_REQUEST', payload: ['period 1', 'period 2'] }]);

// this should be a single 'POST_AND_GET_SUCCESS'
postPeriods(postRequestAction).subscribe(val => console.log(val));

Я знаю, что там, где у меня есть switchMap, неправильно, но я не могу понять, как получить цепочку от from наблюдаемого или оператора mergeMap, чтобы получить желаемый результат.

Ответы [ 2 ]

0 голосов
/ 10 января 2020

Если для запроса GET не требуются какие-либо данные из ответов на запросы публикации, вы можете использовать concat, чтобы добавить действие get в конце

...
mergeMap(action =>
  concat(
    from(action.payload).pipe(
      concatMap(period => of(`${period} post success`).pipe(
        delay(1000),
        tap(() => console.log(`${period} post success`)),
      ))
    ),
    of('get success').pipe(map => of('POST_AND_GET_SUCCESS'))
  )
)
...
0 голосов
/ 10 января 2020

В пределах postPeriod epi c я бы порекомендовал вам завершить действие, возвращая одну наблюдаемую, а не возвращая несколько наблюдаемых.

Сначала вы можете определить массив (observableList ), который будет состоять из списка наблюдаемых. Затем вы можете создать массив sh с наблюдаемыми из delayed. И последнее, но не менее важное: вы перемещаете switchMap() logi c так, что оно будет выполнено только при возврате observableList.

Это обеспечит выполнение действия 'get success' только один раз, после того как запросы завершены, и наблюдаемые возвращаются.

import { from, of, Observable } from 'rxjs';
import { filter, delay, mapTo, mergeMap, concatMap, tap, switchMap } from 'rxjs/operators';

// GOAL
// synchronously post each period, when that's finished perform a get request
// postPeriods should only console the final POST_AND_GET_SUCCESS string that all was successful

interface ActionModel {
  type: string;
  payload: any;
}

const postPeriods = (action$: Observable<ActionModel>) => action$.pipe(
  filter(action => action.type === 'POST_AND_GET_REQUEST'),
  mergeMap(action => {
    // replace the generics with a suitable typing
    const observableList: Observable<any>[] = [];

    const delayed = from(action.payload).pipe(
      concatMap(period => of(`${period} post success`)
        .pipe(
          delay(1000),
          tap(() => console.log(`${period} post success`)),
        )),
    )
    observableList.push(delayed);

    return observableList;
  }),
  switchMap(response => of('get success').pipe(map => of('POST_AND_GET_SUCCESS')))
);

const postRequestAction = from([{ type: 'POST_AND_GET_REQUEST', payload: ['period 1', 'period 2'] }]);

// this should be a single 'POST_AND_GET_SUCCESS'
postPeriods(postRequestAction).subscribe(val => {
  console.log('end:', val);
});
...