RxJs ограничивают количество звонков по подписке - PullRequest
2 голосов
/ 09 мая 2019

Я слушаю ResizeObserver (или, скорее, polyfill ), чтобы вызвать перерисовку холста fabricjs , который довольно ресурсоемкий.Поэтому я хочу ограничить количество перерисовок, запускаемых событием resize.

Я пытался реализовать эту функцию с помощью RxJs:

  1. Перерисовка запускается сразу (в первый раз)
  2. Перерисовка не запускается в течение n миллисекунд
  3. Перерисовка запускается после n миллисекунд
  4. Перерисовка запускается для последнего события изменения размера

RxJsпредлагает некоторые встроенные операторы на основе времени .Однако все они имеют свои недостатки:

  • auditTime : имеет начальную задержку
  • debounceTime : начальная задержка и никогда не срабатывает, когда выпродолжайте изменять размер
  • throttleTime : возможно, игнорируются последние несколько событий, которые имеют решающее значение

Я попытался объединить / присоединить эти операторы, но это привело к двойному вызову послеn секунд и другие проблемы.Есть ли простой способ RxJs сделать это?Я полагаю, что это возможно, запустив / очистив функцию тайм-аута.

Ответы [ 3 ]

1 голос
/ 09 мая 2019

Вы можете использовать throttleTime с опцией trailing для отправки последнего события.Последнее событие будет отправлено с заданной задержкой, а не сразу после прекращения изменения размера.

Поскольку вы также хотите, чтобы первое событие было отправлено, вам дополнительно требуется опция по умолчанию leading.Если оба параметра установлены на true, два события будут запускаться непосредственно одно за другим в конце и начале каждого нового временного интервала.Чтобы предотвратить это, вы можете добавить debounceTime с небольшим временным интервалом, например, 50 мс.

import { fromEvent, asyncScheduler } from 'rxjs';
import { throttleTime, debounceTime, map } from 'rxjs/operators';

const source = fromEvent(window, 'resize').pipe(map(e => e.target['innerWidth']));
const width = source.pipe(
  throttleTime(1000, asyncScheduler, { leading: true, trailing: true }),
  debounceTime(50)
)

https://stackblitz.com/edit/typescript-qsjhvu

1 голос
/ 06 июня 2019

Вы можете написать свою собственную функцию и отслеживать последний отправленный и полученный элемент в локальной переменной. Чтобы прочитать время в Rx, вы должны использовать scheduler.now(), это делает ваш код тестируемым. С switchMap вы можете эмулировать поведение газа, используя switchMap(()=>of(e).pipe(delay(duration))). Используя это, мы можем в особом случае создать ситуацию, когда последний элемент был так давно, что мы хотим сразу же создать следующий элемент.

Это привело к следующему решению:

function limitTime(duration, scheduler = async) {
  return (src) => new Observable(ob => {
      var last = scheduler.now() - duration;
      return src.pipe(
          switchMap(e => {
            var last2 = last;
            last = scheduler.now();
            if(last - last2 > duration) {
              return of(e);
            }
            return of(e).pipe(delay(duration + last2 - last, scheduler),
                             tap(() => last = scheduler.now()));
          })
        ).subscribe(ob);
    })
}

Посмотрите его в действии с некоторыми тестами здесь .

1 голос
/ 09 мая 2019

Первое, что приходит на ум, - это написать свой собственный конвейерный оператор: https://github.com/ReactiveX/rxjs/blob/master/doc/pipeable-operators.md#build-your-own-operators-easily

В качестве альтернативы, вы можете попробовать что-то вроде этого:

const source$:Observable<MyEvent> = getEventObservable(); // Get it from fabric

merge(
    source$.pipe(throttle(100)), // max once in fixed interval
    source$.pipe(debounce(100)), // debounce to get the last value
).pipe(
   distinctUntilChanged() // In case both fire at the same time
).subscribe(val => {
    // Your code
})
...