RxJS - буферизует уникальные значения и испускает, когда никакие другие значения не были переданы в течение x секунд? - PullRequest
0 голосов
/ 08 мая 2018

Пользователь вводит значения в форме, и событие генерируется каждый раз, когда пользователь редактирует определенное поле, причем значением является поле, которое он редактировал.

Например, пользователь, набравший 3 раза в поле описания, а затем два раза в поле имени, будет выглядеть как

"description" => "description" => "description" => "name" => "name" => ...

Я хочу буферизовать уникальные значения и выдавать их в виде массива, когда пользователь прекращает вводить x количество секунд. Значение может снова появиться в другом окне буфера.

По сути, это отслеживание того, какие поля были обновлены, когда пользователь прекратил печатать, и связь с сервером для сохранения отредактированных значений.


Пока у меня есть это, которое излучает каждые 3000 мс, плюс это не предотвращает дублирование при буферизации, но вместо этого мы "дедуплицируем" массив впоследствии.

   this.update$
      .bufferTime(3000)
      .filter(buffer => buffer.length > 0)
      .map(buffer => [...new Set(buffer)])
      .subscribe(x => console.log(x));

Таким образом, он должен прослушивать до тех пор, пока значение не будет передано, а затем буферизовать уникальные значения до , пока больше не будет выдано никаких значений в течение x секунд, а затем испустить буфер и повтори. Как этого добиться?

Ответы [ 2 ]

0 голосов
/ 08 мая 2018

Это может быть альтернативная версия:

const { Observable } = Rx;

const log = (prefix) => (...args) => { console.log(prefix, ...args); };

const inputs = document.querySelectorAll('input');

const updates$ = Observable
  .fromEvent(inputs, 'input')
  .pluck('target', 'id')
;

// wait x ms after last update
const flush$ = updates$.debounceTime(3000);

const buffered$ = updates$
  // use `distinct` without `keySelector`, but reset with flush$
  .distinct(null, flush$)
  
  // flush the buffer using flush$ as `notifier`
  .buffer(flush$)
;

buffered$.subscribe(log('buffered$ =>'));
<script src="https://unpkg.com/@reactivex/rxjs@^5/dist/global/Rx.min.js"></script>

<div><input type="text" placeholder="foo" id="input.foo"></div>
<div><input type="text" placeholder="bar" id="input.bar"></div>
<div><input type="text" placeholder="baz" id="input.baz"></div>
0 голосов
/ 08 мая 2018

Возможно, мой вопрос не был достаточно ясен, в любом случае мне удалось решить его так: (на случай, если это кому-то поможет)

Чтобы буфер выдавал только , когда поток молчал в течение 3 секунд, я запускаю новый таймер каждый раз, когда пользователь что-то вводит (событие, отправленное на update$), и использую switchMap для отменить предыдущий.

this.update$
  .buffer(this.update$.switchMap(x => Observable.timer(3000)))
  .filter(buffer => buffer.length > 0)
  .map(buffer => [...new Set(buffer)])
  .subscribe(console.log);

Затем, чтобы буфер был уникальным сам по себе, а не дедуплицировал его вручную, мне пришлось создать собственный оператор uniqueBuffer.

this.update$
  .uniqueBuffer(this.update$.switchMap(x => Observable.timer(3000)))
  .filter(buffer => buffer.length > 0)
  .subscribe(console.log);

function uniqueBuffer(emitObservable) {
  return Observable.create(subscriber => {
    const source = this;
    const uniqueBuffer = new Set();

    const subscription = source.subscribe(value => {
      uniqueBuffer.add(value);
    });

    emitObservable.subscribe(emit => {
      subscriber.next([...uniqueBuffer]);
      uniqueBuffer.clear();
    })
  })
}

Observable.prototype.uniqueBuffer = uniqueBuffer;
...