Rx JS Звоните не чаще одного раза в секунду, но не теряйте звонки - PullRequest
1 голос
/ 23 апреля 2020

В основном я бы хотел создать очередь. Что-то вроде

const queue = new BehaviorSubject([])
queue.subscribe((args) => someCall(args))

, где я мог бы позвонить

queue.next({arg1, arg2, arg3})

в нескольких местах, иногда очень быстро друг за другом. Я не могу использовать throttle или debounce, потому что я не могу потерять промежуточные вызовы. Мне нужно, чтобы каждый звонок вызывался, но не более 1 в секунду. Если бы двое стреляли в течение секунды друг от друга, нужно было бы подождать 1 секунду. Если 3 сработают в течение секунды, один будет ждать одну секунду, другой будет ждать 2 секунды.

Ответы [ 2 ]

0 голосов
/ 24 апреля 2020

Я недавно оказался в такой же ситуации. API, который я потреблял, мог принимать только 4 запроса в секунду.

Это то, что я придумал.

Труба rateLimit

import { asyncScheduler, BehaviorSubject, timer, MonoTypeOperatorFunction, Observable } from 'rxjs'
import { filter, map, mergeMap, take } from 'rxjs/operators'

export function rateLimit<T>(
  count: number,
  slidingWindowTime: number,
  scheduler = asyncScheduler,
): MonoTypeOperatorFunction<T> {
  let tokens = count
  const tokenChanged = new BehaviorSubject(tokens)
  const consumeToken = () => tokenChanged.next(--tokens)
  const renewToken = () => tokenChanged.next(++tokens)
  const availableTokens = tokenChanged.pipe(filter(() => tokens > 0))

  return mergeMap<T, Observable<T>>((value: T) =>
    availableTokens.pipe(
      take(1),
      map(() => {
        consumeToken()
        timer(slidingWindowTime, scheduler).subscribe(renewToken)
        return value
      }),
    ),
  )
}

И вы можете использовать его нравится. Я хочу получить все контракты в contractIds $ от API. Я только хочу отправлять 4 запроса каждые 1000 мс

const contracts$ = contractIds$.pipe(
  rateLimit(4, 1000),
  mergeMap(contract => this.get(contract.DocumentNumber)),
)

Может быть, это поможет вам:)

0 голосов
/ 24 апреля 2020

Вы можете использовать завершение наблюдаемой в сочетании с объединить все .

объединить все будет генерировать следующую наблюдаемую после завершения предыдущей

1. Создайте свой предмет

const source$$ = new Subject();

2. Предоставьте функцию, которая отображает ваше значение в Observable, которое завершается через 1000 мс

const timeCompletedSource$ = (time) => (value) => Observable.create(observer => observer.next(v)).pipe(
  takeUntil(timer(time))
);

Вам не нужно делать функцию времени динамической c, которую я только что сделал (время ) => (value) => ... потому что я хотел написать оператор типа throttle (1000), который имеет динамический диапазон времени c. Вы можете просто написать (значение) => Наблюдаемый ... если вы хотите установить c время

3. Используйте вашу функцию, чтобы сопоставить ваше значение с наблюдаемой во времени наблюдаемой и объединить все наблюдаемые вместе в concatAll

const result$ = source$$.pipe(
  map(timeCompletedSource$(time))
  concatAll()
);

Здесь вы найдете работающий стек

Pro: сделать пользовательский оператор

const fastThrottle = (time) => (source) => source.pipe(
  map(timeCompletedSource$(1000)),
  concatAll()
)

const result$ = source$$.pipe(
  fastThrottle(1000)
);
...