Rx JS получить массив выбросов в групповом времени. Триггеры Firebase - PullRequest
0 голосов
/ 26 марта 2020

Я новичок в мире Rx JS, и я немного растерялся с этим. Надеюсь, что кто-то может мне помочь.

У меня есть наблюдаемый источник (Firebase через AngularFire), который постоянно выдает мне много данных (до 50 или 80 выбросов в окне 2s) в случайное время пики, так как это Это то, что замедляет производительность моего проекта, и я подумал, что правильный способ справиться с этим - сгруппировать выбросы в массиве, а затем выполнить транзакцию со всеми полученными данными и вставить их в хранилище.

Результат, который я ищу, выглядит примерно так:

Учитывая то, что я бы поставил «удерживать» количество времени 3 с, я бы хотел следующий результат :

           1s     1.5s
--> 30 --> 60 --> 100

          1s           2s
--> 5 --> 1 --> 50 --> 70
  1. [30, 60, 100] -> в интервале времени 1,5 с *
  2. [5, 1, 50, 70] -> в интервале времени 2 с

Значения в массиве - это выбросы, полученные за это конкретное c время, начиная с первого полученного выброса. По истечении указанного c промежутка времени он «перезапустит» инициализацию в следующей партии выбросов (что может фактически произойти через 1 секунду или через 2 часа, но затем интервал будет срабатывать в течение 2 с, когда выбросы получены)

То, что я до сих пор пробовал, - это использовать Window и Buffer, возможно, я их не правильно использую, или я просто тупой, но не могу найти результат, который я только что объяснил.

filter((snapshot) => { if (snapshot.payload.val().reference) { return snapshot; } }),
window(interval(2000)),
mergeAll(),
withTransaction((snapshots:[]) => {
   snapshots.forEach(snapshot => {
     if (snapshot.type === 'child_changed') {

       this.store.add(snapshot.key, snapshot.val());

     } else if (snapshot.type === 'child_changed') {

       this.store.replace(snapshot.key, snapshot.val());

     } else if (snapshot.type === 'child_removed') {

       this.store.remove(snapshot.key);

     }
   })
})

Я даже не знаю, возможно ли это с Rx JS (я так думаю. Я видел много интересных вещей вокруг), но любые предложения или руководство для того, чтобы сделать это, было бы очень признательно.

Заранее большое спасибо!

Примечание: withTransaction - пользовательский оператор.

1 Ответ

1 голос
/ 26 марта 2020

не совсем то, что вы хотите, но похоже, что вы хотите bufferTime?

const source = timer(0, 500);
const buffered = source.pipe(bufferTime(3000));
buffered.subscribe(val => console.log(val));

, при этом все значения, собранные в течение периода буфера, будут отправляться в виде массива каждые 3 секунды.

blitz demo: https://stackblitz.com/edit/rxjs-vpu97e?file=index.ts

в вашем примере, я думаю, вы бы просто использовали его как:

filter((snapshot) => snapshot.payload.val().reference), // this is all you need for filter
bufferTime(2000),
withTransaction((snapshots:[]) => {
   ...
})
...