rxjs асинхронные обновления в Observable - PullRequest
0 голосов
/ 20 февраля 2019

Каков наилучший способ обработки асинхронных обновлений в середине потока Observable.

Допустим, есть 3 наблюдаемые:

Obs1 (получает данные из API) -> каналы к Obs2Obs2 (преобразовывает данные) -> каналы в Obs3 Obs3 (отправляет преобразованные данные)

(Реальное приложение более сложное, и есть причины, по которым оно не выполняется ни в одной Observable, это всего лишь простой пример).

Это все работает хорошо, если это линейный синхронный путь.

Но у нас также есть асинхронные сообщения, которые изменят вывод Obs2.

3 сценария, которые я спрашиваюо том, что: - мы выбираем данные и проходим Obs1, Obs2 & Obs3 - мы получаем сообщение для внесения изменений, проходим Obs2 & Obs3 - мы получаем другое сообщение о внесении изменений, которое также должно применяться к изменениям изпредыдущее сообщение через Obs2 и Obs3

Основная проблема здесь заключается в том, что существуют различные типы асинхронных сообщений, которые изменят результат Obs2, но все они должныЯ все еще знаю, каким был предыдущий результат Obs2 (поэтому все другие изменения в сообщениях, которые произошли раньше, все еще применяются)

Я попытался использовать switchMap в Obs2 со сканированием в Obs1, например так:

obs1

const obs1$ = obs1$.pipe(
  // this returns a function used in the reducer.
  map((data) => (prevData) => 'modifiedData',
  scan((data, reducer) => reducer(betsMap), {})
)

obs2

const obs2$ = obs1$.pipe(
  switchMap(data =>
    someChange$.pipe(map(reducer => reducer(data)))
  )
)

, где someChange$ - BehaviorSubject, применяющий изменение с использованием другой функции редуктора.

Это прекрасно работает для асинхронного сообщения# 1, который вносит некоторые изменения.Но когда приходит сообщение № 2 и требуется другое изменение, первое изменение теряется.

изменения, которые должны быть в "prevData" в obs1 $, всегда неопределены, потому что это происходит до того, как сообщение будет применено.

Как я могу взять вывод из obs2 $ и применить к нему асинхронные обновления, которые запоминают все предыдущие обновления?(таким образом, чтобы я мог очистить все изменения, если это необходимо)

1 Ответ

0 голосов
/ 25 февраля 2019

Так что, если я правильно понял вопрос, есть две проблемы, которые решает этот вопрос:

Первый: Как кэшировать последние 2 значения из потока.

scan определенноправильный путь, если эта логика кэша необходима в более чем одном месте / файле, я бы выбрал пользовательский оператор канала, например, следующий

function cachePipe() {
  return sourceObservable =>
    sourceObservable.pipe(
      scan((acc, cur) => {
        return acc.length === 2 ? [...acc.slice(1), cur] : [...acc, cur];
      }, [])
    );
}

cachePipe всегда будет возвращать последние 2 значенияпередается через поток.

...
.pipe(
      cachePipe()
     )

Секунда: Как получить доступ к данным из нескольких потоков одновременно при событии потока

Здесь оператор создания combineLatest rxjs может помочь вам,

combineLatest(API$, async1$ ,async2$,async3$)
  .pipe(
    // here I have access to an array of the last emitted value of all streams
    // and the data can be passed to `Obs2` in your case
   )

В pipe я могу связать любое количество наблюдаемых, что решает вторую проблему.

Примечание:

combineLatest необходимо для всех потоков внутри него испускать один раз , прежде чем оператор начнет испускать их объединенное значение , один обходной путь должен использовать startWith оператор с вашими входными потоками, другой способ сделать это с помощью ранализируя данные через BehaviorSubject -s.

Вот демонстрация в CodeSandbox , в которой используются стратегии cachePipe() и startWith для объединения источника (Obs1) с асинхроннымнаблюдаемые, которые изменят данные.

...