Как использовать буфер rx js с takeWhile - PullRequest
1 голос
/ 21 июня 2020

Я работаю на webrt c. Приложение отправляет icecandidates на внутренний сервер Firestore. Проблема в том, что вызов сигнального сервера выполняется несколько раз, поскольку onicecandidate запускается несколько раз. Я хочу собрать все ледяные кандидаты и сделать один звонок на сервер сигнализации. Идея состоит в том, чтобы буферизовать все события до завершения iceGathering. Эта попытка ниже не работает

this.pc = new RTCPeerConnection(iceServers);
const source: Observable<any> =  fromEvent(this.pc, 'icecandidate');
const takeWhile$ = source
        .pipe(
            takeWhile(val=> val.currentTarget.iceGatheringState === 'gathering'
    ))
const buff = source.pipe(buffer(takeWhile$));
    buff.subscribe(() => {
        // this.pc.onicecandidate = onicecandidateCallback;
    })

1 Ответ

0 голосов
/ 22 июня 2020

Метод 1:

Вы почти у цели.

takeWhile$ принимает значения и выдает их, пока выполняется условие. Таким образом, в buff всякий раз, когда takeWhile$ испускает значение, buff генерирует буфер из icecandidate событий.

Таким образом, вам нужно испустить только одно значение в takeWhile$.

Итак, вам нужен оператор takeLast(), который будет выдавать только последнее значение.

Когда вы помещаете takeLast(1) в takeWhile$, он выдает только последнее значение, а в buff последнее переданное значение приводит к создание буфера icecandidate событий.

this.pc = new RTCPeerConnection(iceServers);

const source: Observable<any> = fromEvent(this.pc, "icecandidate");

const takeWhile$ = source.pipe(
  takeWhile(val => val.currentTarget.iceGatheringState === "gathering"),
  takeLast(1)
);

const buff = source.pipe(buffer(takeWhile$));

buff.subscribe((bufferValues) => {

   // bufferValues has a buffer of icecandidate events

  // this.pc.onicecandidate = onicecandidateCallback;
});

У вас будет доступ к буферу icecandidate событий в подписке как bufferValues в приведенном выше коде.

Метод 2:

Вы также можете использовать оператор reduce для достижения того же сценария

this.pc = new RTCPeerConnection(iceServers);

const source: Observable<any> = fromEvent(this.pc, "icecandidate");

const takeWhile$ = source.pipe(
  takeWhile(val => val.currentTarget.iceGatheringState === "gathering"),
  reduce((acc, val) => [...acc,val], [])
);

takeWhile$.subscribe((bufferValues) => {

  // bufferValues has a buffer of icecandidate events

 // this.pc.onicecandidate = onicecandidateCallback;
})
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...