Получите одно событие за интервал в 1000 миллисекунд - PullRequest
0 голосов
/ 01 апреля 2020

У меня есть бесконечный поток событий, которые могут генерировать несколько последовательных частей события, и я хочу принимать одно событие на 1000 каждые миллисекунды.

Я пытался debounceTime / auditTime / throttleTime, но это не так не включать все события, которые я хочу, - чтобы продемонстрировать поведение, которое я создал площадка для игры на стеке , которая запускает события один раз за 300 мс в течение 10 событий:

  • debounceTime(1000) выдаст только событие # 10

  • throttleTime(1000) выдаст события 1,5,9, но пропустит # 10, что необходимо

  • auditTime(1000) выдаст события 4,8

Здесь я хочу получить события 1,5,9,10 (одно событие на интервал 1000 мс). Как мне этого добиться?

const events$ = interval(300).pipe(
  map(num => `Firing event ${num + 1}`)
);

const source = events$.pipe(
  tap(console.log),
  take(10),
  // make some debouncing??
  map(x => `Received ${x}!`)
);

source.subscribe(x =>
  console.log(
    "%c" + x,
    "background: purple; color: white; padding: 3px 5px; border-radius: 3px;"
  )
);

Я также пытался играть с zip / combineLatest и излучать значения через интервал , но не повезло с этим

Ответы [ 2 ]

1 голос
/ 01 апреля 2020

ОБНОВЛЕНО

на основе обсуждения в комментариях

const events$ = timer(0, 6000).pipe(
  take(3),
  switchMap(x =>
    timer(0, 300).pipe(
      map(num => `event #${num + 1}`),
      take(x > 1 ? 9 : 10)
    )
  )
);

const source = merge(
  events$.pipe(
    tap(e => console.log(`%cStream: ${e}`, "color: blue;")),
    debounceTime(1000),
    tap(x => console.log(`%cdebounceTime captured: ${x}`, "color: red;"))
  ),
  events$.pipe(
    throttleTime(1000),
    tap(x => console.log(`%cthrottleTime captured: ${x}`, "color: green;"))
  ),
).pipe(
  // we need to avoid duplicates (like case with 9).
  // if all events aren't unique you need to use the original solution below.
  distinctUntilChanged(), // <-- if all events are unique.
  map(x => `Received ${x}!`)
);

source.subscribe(x =>
  console.log(
    "%c" + x,
    "background: purple; color: white; padding: 3px 5px; border-radius: 3px;"
  )
);

ОРИГИНАЛ

Надеюсь, это то, что вы хотели : https://take.ms/VP7tA

const events$ = interval(300).pipe(
    map(num => `Firing event ${num + 1}`)
);

const source = concat(events$.pipe(
    tap(console.log),
    take(10),
), timer(1000).pipe(switchMapTo(EMPTY)), events$.pipe(
    tap(console.log),
    take(10),
));

let lastTimer = 0;
const last$ = new Subject<number>();
merge(
    source.pipe(
      scan((state, event) => {
        state[1] = null;
        const stamp = new Date().getTime();
        clearTimeout(lastTimer);
        if (stamp - state[0] < 1000) {
          lastTimer = setTimeout(() => last$.next(event), (stamp - state[0]) + 50);
          return state;
        }
        state[0] = stamp;
        state[1] = event;
        return state;
      }, [0, null]),
      filter(([, event]) => event !== null),
      map(([, event]) => event || 0),
    ),
    last$,
).pipe(
    map(x => `Received ${JSON.stringify(x)}!`)
).subscribe(x =>
    console.log(
        "%c" + x,
        "background: purple; color: white; padding: 3px 5px; border-radius: 3px;"
    )
);
0 голосов
/ 01 апреля 2020

Вот один из подходов к этому:

const events$ = interval(300).pipe(
  map(num => `Firing event ${num + 1}`),
  share(),
);

const source$ = events$.pipe(
  take(10),
);

const lastValue$ = source$.pipe(last());

merge(source$.pipe(throttleTime(1000)), lastValue$).subscribe(console.log);

share() убедится, что источник (наблюдаемый, созданный interval by) подписан только один раз, когда

last() вернется последнее значение nexted , когда его источник (source$ в этом случае) завершается из-за take(10)

EDIT

Исходя из моего понимания проблемы, это будет другая альтернатива:

const events$ = merge(
  timer(300), // 1
  timer(600), // 2
  timer(900), // 3
  timer(1301), // 4
  timer(1500), // 5
  timer(1800), // 6
  timer(2302), // 7
).pipe(
  map((_, num) => `Firing event ${num + 1}`),
  share(),
);

const lastValue$ = events$.pipe(debounceTime(1000));

const source$ = events$.pipe(
  throttleTime(1000),
  buffer(lastValue$)
)
.subscribe(console.log)
  • debounceTime(1000) - если 1с прошло без каких-либо уведомлений

  • buffer(lastValue$) - когда lastValue$ испускает отправьте собранные значения

StackBlitz

...