RX JS bufferedAmount / накапливать значение / уменьшать при сбросе - PullRequest
2 голосов
/ 27 апреля 2020

У меня есть поток значений

[ 0.5, 0.3, 0.4, 0.6, 1.4, 0.3, 0.6 ]

, и я хочу превратить это в

[           1         2         1 ]

, поэтому мы накапливаем значения первого потока до целого числа достигается (по крайней мере, 1), а затем испустить целое число при накоплении оставшейся суммы.

Это совершенно ошеломляет мой разум и думаю, что решение находится за углом с помощью SwitchMap.

Ответы [ 2 ]

2 голосов
/ 27 апреля 2020

Вот мой подход:

src$ = src$.pipe(publish());

const wholeNumber$ = src$.pipe(
  scan(
    (acc, crt) => (acc | 0) > 1 ? crt + (+(acc - (acc | 0)).toPrecision(1)) : acc + crt, 0
  ),
  map(v => (v | 0)),
  filter(v => v >= 1),
);

src$.pipe(
  buffer(wholeNumber$)
).subscribe();

publish убедится, что источник не подписан несколько раз. Это также короткая версия multicast(new Subject()), которая является способом многоадресной рассылки источника. Чтобы это работало, src$ должен излучать асинхронно, чтобы используемый Subject мог правильно зарегистрировать своих подписчиков (wholeNumber$ и другого).

Если источник не испуская асинхронно, вы можете заставить сделать это, используя src$.pipe(observeOn(asapScheduler)), который будет планировать каждое уведомление как обещание.

Давайте внимательно рассмотрим cb, предоставленный scan :

(acc, crt) => (acc | 0) > 1 ? crt + (+(acc - (acc | 0)).toPrecision(1)) : acc + crt`

number | 0 совпадает с Math.trunc(number).

In +(acc - (acc | 0)).toPrecision(1):

  • когда вы делаете 1.2 - 1, вы ' получаю: 0.199...96; с toPrecision(1): (1.2 - 1).toPrecision(1) = "0.2". + получит 0.2 как число.
0 голосов
/ 28 апреля 2020

Спасибо, Андрей,

Я использовал ваш код и изменил его на пользовательский оператор.


export const bufferAmount = (
  amount: number
): MonoTypeOperatorFunction<number> => (
  source: Observable<number>
): Observable<number> =>
  new Observable<number>((observer) => {
    let bufferSum = 0;
    return source.subscribe({
      next(value) {
        bufferSum += value;
        if (bufferSum < amount) return;
        const nextSum = Math.trunc(bufferSum);
        bufferSum -= nextSum;
        observer.next(nextSum);
      },
      error(error) {
        observer.error(error);
      },
      complete() {
        observer.complete();
      },
    });
  });

ticker.pipe(bufferAmount(1)).subscribe();

...