Выдавать действия перед `toArray` - Redux Observable - PullRequest
0 голосов
/ 25 сентября 2018

Я использую Redux Observable, и мне нужно решить проблему синхронизации при запуске действий из эпопей.

У меня есть массив элементов, которые я хочу зациклить для выполнения вызовов AJAX для каждого из них.Сразу после получения ответа AJAX я хочу выполнить некоторые действия.После того, как все ответы AJAX возвращаются для каждого элемента в исходном массиве, я хочу запустить больше действий.

Как я могу заставить эти действия запускаться сразу после истечения срока действия timer, даже если исходный массивне закончил цикл?

const someEpic = action$ => (
    action$
    .pipe(
        ofType(SOME_ACTION),
        switchMap(({ payload }) => (
            from(payload) // This is an array of items
            .pipe(
                mergeMap(() => (
                    timer(5000) // This is my AJAX call
                    .pipe(
                        map(loadedAction),
                    )
                )),
                toArray(),
                map(anotherAction),
            )
        ))
    )
)

Ответы [ 2 ]

0 голосов
/ 28 сентября 2018

Для этого требуется 2 эпоса.

const ajaxCallerEpic = action$ => (
    action$
    .pipe(
        ofType(AJAX_ACTION),
        switchMap(({
            payload,
            payloadId,
        }) => (
            merge(
                from(payload) // This is an array of items
                .pipe(
                    mergeMap(() => (
                        timer(5000) // This is my AJAX call
                        .pipe(
                            switchMap(() => (
                                merge(
                                    of(loadedAction),
                                    of(
                                        sentData(
                                            payloadId
                                        )
                                    ),
                                )
                            )),
                        )
                    )),
                ),
            )
        ))
    )
)

const ajaxResponsesEpic = action$ => (
    action$
    .pipe(
        ofType(AJAX_ACTION),
        switchMap(({
            payload,
            payloadId,
        }) => (
            action$
            .pipe(
                ofType(SENT_DATA_ACTION),
                filter(({ id }) => (
                    id === payloadId
                )),
                bufferCount(
                    payload
                    .length
                ),
                map(anotherAction),
            )
        ))
    )
)

Важной частью является вторая SENT_DATA_ACTION.Я передал уникальный идентификатор, когда он вызывается, чтобы убедиться, что вы слушаете правильный.Если вы не отправите их все, он будет прослушивать, пока открыт браузер.Вы всегда можете добавить тайм-аут на внутреннего слушателя, чтобы убедиться, что он завершен.Другая проблема может возникнуть, если ajaxResponsesEpic настроит прослушиватель action$ позже, чем когда ajaxCallerEpic выполнит вызовы AJAX.

Это вполне может оказаться условием race.Чтобы учесть эти проблемы, сначала нужно выполнить ajaxResponsesEpic, чтобы он настроил прослушиватель действий и одновременно запускает вызовы AJAX после его прослушивания.

Примерно так:

const ajaxCallerEpic = action$ => (
    action$
    .pipe(
        ofType(AJAX_READY_ACTION),
        switchMap(({
            payload,
            payloadId,
        }) => (
            merge(
                from(payload) // This is an array of items
                .pipe(
                    mergeMap(() => (
                        timer(5000) // This is my AJAX call
                        .pipe(
                            switchMap(() => (
                                merge(
                                    of(loadedAction),
                                    of(
                                        sentAjaxData(
                                            payloadId
                                        )
                                    ),
                                )
                            )),
                        )
                    )),
                ),
            )
        ))
    )
)

const ajaxResponsesEpic = action$ => (
    action$
    .pipe(
        ofType(AJAX_ACTION),
        switchMap(({
            payload,
            payloadId,
        }) => (
            merge(
                (
                    action$
                    .pipe(
                        ofType(SENT_AJAX_DATA_ACTION),
                        filter(({ id }) => (
                            id === payloadId
                        )),
                        bufferCount(
                            payload
                            .length
                        ),
                        map(anotherAction),
                    )
                ),
                (
                    of(ajaxReadyAction)
                ),
            )
        ))
    )
)
0 голосов
/ 27 сентября 2018

Вероятно, самый простой способ на самом деле - генерировать действия, используя tap, где вы хотите.Это при условии, что у вас есть доступ к магазину.Например:

tap(result => this.store.dispatch(...))

Однако более "Rx" способ разделить цепочку с использованием multicast, а затем немедленно переиздать одну часть (это процесс загрузки), а другую половину соединить с toArray() всобрать все результаты, которые затем превратятся в другое действие, сигнализирующее, что загрузка завершена.

import { range, Subject, of } from 'rxjs';
import { multicast, delay, merge, concatMap, map, toArray } from 'rxjs/operators';

const process = v => of(v).pipe(
  delay(1000),
  map(p => `processed: ${p}`),
);

range(1, 5)
  .pipe(
    concatMap(v => process(v)),
    multicast(
      () => new Subject(), 
      s => s.pipe(
        merge(s.pipe(toArray()))
      )
    ),
  )
  .subscribe(console.log);

Демонстрационная версия: https://stackblitz.com/edit/rxjs6-demo-k9hwtu?file=index.ts

...