RxJS дросселирует то же значение, но пропускает новые значения - PullRequest
0 голосов
/ 05 декабря 2018

«Здесь у вас есть», кто-то говорит, и вам передается этот поток значений, который вы хотите сделать вразличноUntilChanged () при ...

Input:  '1-1----11---2--1122----1---2---2-2-1-2---|'
Output: '1-----------2--1-2-----1---2-------1-2---|'

Пока ничего странного,
Но теперь кто-то говорит «все в порядке», если то же самое значение приходит снова, «но только если это не скоро!».Я хочу по крайней мере '----' тиков между одним и тем же значением.«Окей», - говорите вы и добавляете газ

const source = new Subject<number>();

// mysterious cave troll is randomly source.next(oneOrTwo)

const example = source.pipe(throttle(val => interval(4000)));

Input:  '1-1----11---2--1122----1---2---2-2-1-2-----|'
Output: '1------1----2----2-----1-------2-----2-----|'

«Это не то, что я хочу! Посмотрите на все пропущенные значения», имея в виду, что вы ограничиваете все передаваемые значения.

Input:  '1-1----11---2--1122----1---2---2-2-1-2-----|'
Output: '1------1----2----2-----1-------2-----2-----|'
        '-------------->1<--------->2<----->1<------|' <-- Missed values

«Вот, позвольте мне показать вам шоу», - говорит таинственный человек и дает вам этот

Требуемый результат

Input:  '1-1----11---2--1112----1---2---2-2-1-2-----|'
Output: '1------1----2--1--2----1---2-----2-1-------|'

Мой ответ таков:комбинированное окно не подойдет.

От кого-то более опытного,
это сложная проблема для решения? (или я пропустил очевидное решение)

Ответы [ 5 ]

0 голосов
/ 05 декабря 2018

Я нашел решение, которое работает, кто-то имеет какое-либо отношение к этому?

source.pipe(
   windowTime(4000),
   concatMap(obs => obs.pipe(distinct()))
);

Примеры из ранее, в StackBlitz примере

ОБНОВЛЕНИЕ: это на самом деле не работает на 100%.Это только принять текущее окно во внимание.Так, например, вы можете иметь

`[1-12][2---]` which would give `1--22---|`

, где [----] будет представлять временное окно.Другими словами, если значение сначала выводится последним в одном окне и сначала выводится в следующем окне, то же самое значение будет проходить сразу после друг друга.

Спасибо @ eric99 за то, что заставили меня понять это.

0 голосов
/ 05 декабря 2018

Вот сложное решение, основанное на теории операторов, но я не уверен, что оно действительно работает, потому что сначала мне нужно смоделировать излучение источника.

Таким образом, для дросселя и отдельного потока всегда кэшируется последнее значение, zip следит за тем, чтобы они всегда передавались в паре, zip всегда излучает, когда какой-либо поток излучает, потому что это shareReplay (1).

Мы всегда берем значение emit изifiveStream, даже когда поток zip запускается дросселем, потому что ProperStream всегда имеет последнее кэшированное значение.

const throttleStream= source.pipe(throttle(val => interval(4000)),shareReplay(1))
const distinctStream= source.pipe(distinctUntilChanged(),shareReplay(1))
zip(throttleStream,distinctStream).pipe(
   map((t,d)=>d)
)
0 голосов
/ 05 декабря 2018

Это моя вторая попытка, она фильтрует поток по выводу (вместо того, чтобы брать отдельныйUntil), затем регулирует и объединяет два потока.

Конечно, у нас может не быть известного набора значений (1,2, ... n).
Если я смогу выяснить эту морщинку, добавлю еще один пример.

const output = merge(
  source.pipe( filter(x => x === 1), throttle(val => interval(ms))),
  source.pipe( filter(x => x === 2), throttle(val => interval(ms)))
)

Вот мой чек (мс = 4000)

input         1-1----11---2--1112----1---2---2-2-1-2-----
expected      1------1----2--1--2----1---2-----2-1-------

filter(1)     1-1----11------111-----1-----------1-------
throttle(1)   1------1-------1-------1-----------1-------

filter(2)     ------------2-----2--------2---2-2---2-----
throttle(2)   ------------2-----2--------2-----2---------

merged        1------1----2--1--2----1---2-----2-1-------
expected      1------1----2--1--2----1---2-----2-1-------

Расширение до n значений

Я думаю, что это будет работать, когда набор значений в потоке заранее неизвестен (или имеет большой диапазон, поэтому расширение предыдущего ответа нецелесообразно).

Это должно работать до тех пор, пока источник завершит работу.

merge(
  source.pipe(
    distinct().pipe(
      mapTo(distinctVal => source.pipe( 
        filter(val = val === distinctVal), 
        throttle(val => interval(ms))
      )
    )  
  )
)

У меня еще нет доказательств, я опубликую это позже.

0 голосов
/ 05 декабря 2018

Сначала у меня возникла идея как-то объединить distinctUntilChanged() и throttleTimte(), однако я не смог найти решение, а затем я попробовал что-то еще.

ОператорЯ придумал throttleDistinct(), который работает так, как вы хотели бы: StackBlit Editor Link

Он имеет 2 параметра:

  1. duration: number в миллисекундах и аналогично длительности в throttleTime(duration: number)
  2. equals: (a: T, b: T) => boolean - функция для сравнения, если предыдущий элемент равен следующему элементу, реализация по умолчанию которого (a, b) => a === b

import { of, fromEvent, interval, Observable } from 'rxjs';
import { map, scan, filter, } from 'rxjs/operators';

const source = fromEvent(document, 'keypress')
  .pipe(map((x: any) => x.keyCode as number))

source
  .pipe(
    throttleDistinct(1000),
  )
  .subscribe((x) => console.log('__subscribe__', x));

export function throttleDistinct<T>(
  duration: number,
  equals: (a: T, b: T) => boolean = (a, b) => a === b
) {
  return (source: Observable<T>) => {
    return source
      .pipe(
        map((x) => {
          const obj = { val: x, time: Date.now(), keep: true };
          return obj;
        }),
        scan((acc, cur) => {
          const diff = cur.time - acc.time;

          const isSame = equals(acc.val, cur.val)
          return diff > duration || (diff < duration && !isSame)
            ? { ...cur, keep: true }
            : { ...acc, keep: false };
        }),
        filter((x) => x.keep),
        map((x) => x.val),
      )
  }
}
0 голосов
/ 05 декабря 2018

Вне моей головы, вы хотите буферизовать по временному интервалу, а затем различаться в каждом буфере.

Эффективно, вы хотите перезапускать / перезагружать отдельный прогон каждые n миллисекунд.

source.pipe(
  bufferTime(ms),
  mergeMap(bufferArray => from(bufferArray).pipe(distinctUntilChanged()) )
)
...