Rx JS буфер и задержка, пока один другой наблюдаемый не испустит первый раз - PullRequest
0 голосов
/ 25 марта 2020

Мне трудно сделать сложную наблюдаемую трубу, и я был бы признателен, если бы кто-то мог помочь мне в этом ...

Контекст

У меня есть один поток данных, который дает мне через Bluetooth некоторые значения, которые являются фреймами данных, которые я должен декодировать.

Это BehaviorSubject, называемый RX$.

Сейчас на RX$, иногда я получаю Мгновенные данные (INST) и иногда данные истории (HIST). С помощью INST я получаю, помимо прочего, устройство, которое отправляет данные о версии и модели. Я успешно сгенерировал наблюдаемую, которая способна вычислить мне объект JSON с версией и моделью устройства и который не излучает, пока у него нет обоих, давайте назовем его deviceVersionModelStream$

Теперь, с другой стороны, я получаю кадры данных HIST навалом в потоке, который мы будем называть historyStream$, и, поскольку данных много, я использовал bufferTime(2000), чтобы создать массив данных и полагаться на объем встроенной базы данных. вставить (вместо одного за другим).

До сих пор это работало хорошо ...

Новый вариант использования

Теперь мой клиент добавил новое правило, у него старый тип устройства который не может дать мне некоторые данные для конкретного случая c, но, используя тот же шаблон, я знаю, что он мне дает.

Поэтому мне нужно иметь версию и модель устройства до декодирование кадра и вставка в базу данных.

Мой вопрос: как я могу отложить historyStream$ вхождений, если deviceVersionModelStream$ испускает один раз (это ГОРЯЧИЙ, который также используется в других местах) и когда это происходит? Я хочу генерировать какой-то JSON объект с необработанным фреймом и версией / моделью.

Но ТАКЖЕ постепенно отправлять эту информацию, чтобы не перегружать мои массовые вставки в базу данных, как раньше делал мой bufferTime (2000)?.

Я пытаюсь что-то сделать с буфером, mergeMap, задержкой, но мне трудно добиться этого ...

Может быть, кто-то, кто сильн с RX, может мне помочь?

Спасибо серия

1 Ответ

0 голосов
/ 25 марта 2020

1 Общая идея - переключение назад и вперед между буферизованным и небуферизованным

Похоже, что вам нужно сначала поставить паузу (буфер) historyStream$, а затем отменить его, когда выдается deviceVersionModelStream$. Способ приостановки / отмены приостановки потоков:

merge(
  source$.pipe(bufferToggle(pauseOn$, () => pauseOff$)),
  source$.pipe(windowToggle(pauseOff$, () => pauseOn$))
).pipe(mergeAll())

См. Также: https://medium.com/@kddsky / pauseable-observables-in-rx js -58ce2b8c7dfd

Ваш speci c case

Для вашего случая это будет:

const versionModel$ = deviceVersionModelStream$.pipe(take(1));
merge(
  historyStream$.pipe(bufferToggle(of(0), v => versionModel$)),
  historyStream$.pipe(windowToggle(versionModel$, v => NEVER))
).pipe(
  mergeAll(),
  bufferTime(2000)
);

Если вы хотите, чтобы выход deviceVersionModelStream$ был доступен для каждого эмитта, вы можете использовать combineLatest. Если несколько подписок на historyStream$ и deviceVersionModelStream$ являются проблемой, вы можете заранее использовать share.

const versionModel$ = deviceVersionModelStream$.pipe(share(), take(1));
const historyStreamShared$ = historyStream$.pipe(share());
combineLatest(
  versionModel$,
  merge(
    historyStreamShared$.pipe(bufferToggle(of(0), v => versionModel$)),
    historyStreamShared$.pipe(windowToggle(versionModel$, v => NEVER))
  ).pipe(
    mergeAll(),
    bufferTime(2000)
  )
).subscribe(console.log);

https://stackblitz.com/edit/rxjs-oyctsg

Редактировать (если вам нужен буфер только один раз в начале)

В вашем случае вам не нужно больше использовать буфер после его выключения, то есть один раз вы переключаетесь с bufferToggle на поток windowToggle, вам не нужно переключаться обратно на поток bufferToggle.

Это позволяет использовать немного простой подход, используя только buffer вместо bufferToggle и windowToggle.

2 Общая идея - буфер только в начале

source$ = dataStream.pipe(share()) // make sure this is a hot observable
pauseOff$ = timer(5000) // make sure this observable emits once and completes

concat(
  source$.pipe(buffer(pauseOff$), mergeAll()), // start with a buffered stream
  source$ // switch to the unbuffered stream when pauseOff$ emits and completes the previous stream
)

Ваши данные c case

const historyStream$ = timer(0, 100).pipe(share(), take(200));
const versionModel$ = of("version-model").pipe(delay(5000), take(1));

combineLatest(
  versionModel$,
  concat(
    historyStream$.pipe(buffer(versionModel$), mergeAll()),
    historyStream$
  ).pipe(
    bufferTime(2000)
  )
).subscribe(console.log);

https://stackblitz.com/edit/rxjs-2ptrux

...