Мне трудно сделать сложную наблюдаемую трубу, и я был бы признателен, если бы кто-то мог помочь мне в этом ...
Контекст
У меня есть один поток данных, который дает мне через Bluetooth некоторые значения, которые являются фреймами данных, которые я должен декодировать.
Это BehaviorSubject, называемый RX$
.
Сейчас на RX$
, иногда я получаю Мгновенные данные (INST) и иногда данные истории (HIST). С помощью INST я получаю, помимо прочего, устройство, которое отправляет данные о версии и модели. Я успешно сгенерировал наблюдаемую, которая способна вычислить мне объект JSON с версией и моделью устройства и который не излучает, пока у него нет обоих, давайте назовем его deviceVersionModelStream$
Теперь, с другой стороны, я получаю кадры данных HIST навалом в потоке, который мы будем называть historyStream$
, и, поскольку данных много, я использовал bufferTime(2000)
, чтобы создать массив данных и полагаться на объем встроенной базы данных. вставить (вместо одного за другим).
До сих пор это работало хорошо ...
Новый вариант использования
Теперь мой клиент добавил новое правило, у него старый тип устройства который не может дать мне некоторые данные для конкретного случая c, но, используя тот же шаблон, я знаю, что он мне дает.
Поэтому мне нужно иметь версию и модель устройства до декодирование кадра и вставка в базу данных.
Мой вопрос: как я могу отложить historyStream$
вхождений, если deviceVersionModelStream$
испускает один раз (это ГОРЯЧИЙ, который также используется в других местах) и когда это происходит? Я хочу генерировать какой-то JSON объект с необработанным фреймом и версией / моделью.
Но ТАКЖЕ постепенно отправлять эту информацию, чтобы не перегружать мои массовые вставки в базу данных, как раньше делал мой bufferTime (2000)?.
Я пытаюсь что-то сделать с буфером, mergeMap, задержкой, но мне трудно добиться этого ...
Может быть, кто-то, кто сильн с RX, может мне помочь?
Спасибо серия