Слушать и сохранять значения и излучать через x секунд после первого сохраненного значения времени? - PullRequest
1 голос
/ 11 марта 2019

RxJS 4:

Я пытаюсь сохранить некоторые значения и выдать их через x секунд после первого сохраненного значения времени (другими словами, первое значение, полученное из websocket, переключит таймер для сохранения входящихзначения, а затем испустить через х секунд).После выдачи значений таймер остановится и значения не будут отправлены.Пока я не получил новое значение от websocket, цикл начинался снова.

Причина в том, что в настоящее время в моем приложении оно, вероятно, будет выдавать значение каждую наносекунду (данные websocket), и это как бы влияет на производительность.Я думаю сохранить столько значений, сколько я могу получить за x секунду, а затем сгенерировать эти значения вместе, чтобы выполнить несколько пакетных вычислений одновременно.

Я пробовал это, но, похоже, не работает должным образом.

         public testObs = new Observable<any>();
         public bufferStarted = false;
         private subject = new Subject<any>();

         webSocket.onmessage = ((event: any) => {
            this.subject.next(event.data);
            if(!bufferStarted) {
                bufferStarted = true;

                //start the buffer now
                const startInterval = Observable.timer();

                //emit value after 1s and close buffer
                const closingInterval = val => {
                    console.log(`Buffer is open! Emitting value after 1s`)
                    bufferStarted = false;
                    return Observable.interval(1000);
                }
                this.testObs = this.subject.bufferToggle(startInterval, closingInterval);
             }
         }

В компоненте я подписываюсь на testObs.subscribe((e) => ... ).Пример: значение отправляется через веб-сокет, и запускается таймер, который открывает буфер на 1 секунду.В течение 1 секунды из веб-сокета поступает еще 50 значений.Я ожидал 51 значения (длина массива 51), полученные в компоненте.Однако я получил, что наблюдаемое не определено.Помощь приветствуется.

1 Ответ

0 голосов
/ 25 марта 2019

bufferTime считается хорошим выбором для вас, он группирует элемент потока в указанное время. Но он по-прежнему генерирует пустой массив, даже если элемент не генерируется из источника, поэтому вы можете использовать filter, чтобы опустить пустой массив. Вот демо:

// assume a web socket stream
ws$;

const notEmpty = arr => Boolean(arr.length);

const grouped$ = ws$.pipe(bufferTime(1000), filter(notEmpty));

grouped$.subscribe(group => {
// group is an array of item from ws$
// do something here
});
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...