Объедините потоки RX js, обнаружить изменения и вернуть одно значение - PullRequest
1 голос
/ 07 апреля 2020

Фон - Angular +. net Core WebApi, скажем, я создаю приложение для управления временем, чтобы отслеживать, сколько времени вы потратили на задачи -> пользователь создает задачу во внешнем интерфейсе и запускает ее -> там таймер, показывающий истекшее время.

Идея - истекшее время исходит из бэкэнда. Когда пользователь запускает задачу во внешнем интерфейсе через службу, я запускаю таймер на внутреннем интерфейсе и передаю истекшее время обратно во внешний интерфейс, чтобы я мог показать его пользователю. Я хочу показать правильное значение, даже если есть проблема с соединением между бэкэндом и внешним интерфейсом.

Диаграмма: внутренний поток генерирует значение каждую секунду (истекшее время, которое я показываю пользователю), но есть сбой соединения и через 6 секунд он на мгновение зависает, а затем отправляет 9 («0:09»), что может сбить пользователя с толку («было 6 секунд, а сейчас 9 ??»). Поэтому я создаю интервал на внешнем интерфейсе, который генерирует новое значение каждую секунду. Каждую секунду я хочу проверить, послал ли внутренний поток новое значение, а если нет, то хочу получить предыдущее значение и изменить его, чтобы пользователь получил правильное значение.

bStream => ---1---2---3---4---5---6---x---x---9
fStream => ---1---2---3---4---5---6---7---8---9

What user sees:
00:01 -> 00:02 -> 00:03 -> 00:04 -> 00:05 -> 00:06 (freeze) -> 00:09

What I want to user to see:
00:01 -> 00:02 -> 00:03 -> 00:04 -> 00:05 -> 00:06 -> (freeze - frontend knows there is no new value so adds 1 second to previous)
So, it should look like that:
00:01 -> 00:02 -> 00:03 -> 00:04 -> 00:05 -> 00:06 -> 00:07 -> 00:08 -> 00:09

Я изо всех сил пытаюсь начать.

Сделал быструю скрипку с двумя потоками, но не могу понять, как его найти. BStream не выдал новое значение.

https://stackblitz.com/edit/typescript-yookbj

Ответы [ 2 ]

2 голосов
/ 07 апреля 2020

Вот один из подходов:

const be$ = concat(
  of(1).pipe(delay(100)),
  of(2).pipe(delay(100)),
  of(3).pipe(delay(100)),
  of(4).pipe(delay(100)),
  of(5).pipe(delay(100)),
  of(6).pipe(delay(100)),

  of(10).pipe(delay(500)), // After freeze
  of(11).pipe(delay(100)),
  of(12).pipe(delay(100)),
).pipe(shareReplay({ bufferSize: 1, refCount: true, }), endWith(null));

// `skip(1)` - the `ReplaySubject` used be `shareReplay()` will give us the latest value
// and it's not needed
const beReady$ = be$.pipe(skip(1), take(1));

const fe$ = be$.pipe(
  mergeMap(v => merge(
    of(v),
    of(v).pipe(
      expand(v => timer(100).pipe(map(v1 => 1 + v))),
      takeUntil(beReady$),
    )
  )),
  distinctUntilChanged(),
  filter(v => v !== null)
).subscribe(console.log)

endWith(null) - чтобы остановить рекурсию, когда испускается последнее значение (12), нам нужен источник, чтобы испустить что-то еще

shareReplay - необходимо предоставить доступ к источнику, поскольку будет другой подписчик (beReady$), кроме основного подписчика (fe$)

mergeMap(v => merge(
  of(v), // Pass along the current value
  of(v).pipe(
    // If the `be$` does not emit in the next 100ms, send the `currentValue + 1`
    // and keep doing the same until the `be$` finally emits
    expand(v => timer(100).pipe(map(v1 => 1 + v))),
    takeUntil(beReady$),
  )
)),

expand похоже на использование mergeMap, но:

  • он будет проходить по внутреннему значению
  • он создаст другую внутреннюю наблюдаемую на основе последнего внутреннего значения ; Итак, это рекурсивно
  • takeUntil(beReady$), как рекурсия может быть остановлена ​​

StackBlitz

2 голосов
/ 07 апреля 2020

пожалуйста, проверьте код ниже. чтобы смоделировать син c выпуск, измените код v * 5 на v * 4, тогда счетчик будет уважать значение из "бэкэнда", как только оно получено.

// emit every 5s
const source = interval(5000).pipe(
    map(v1 => v1 + 1), // <- only for the example. starting counting from 1.
    startWith(0), // <- only for the example. instant emit of 0.

    map(v => v * 5), // <- every 5 seconds we get right passed amount. so it emits 0 5 10 15.
);
// emit every 1s
const secondSource = interval(1000).pipe(
    delay(50), // <- we want it to be a bit later than original counter.
    map(v1 => v1 + 1), // emits are 1 2 3, not from 0.
    startWith(0), // instant emit of 0.
);

source.pipe( // <- it is our slow backend (it gives us an update every 5 seconds.
    switchMap(v => secondSource.pipe( // if no emit from parent stream - then in 1.05 we get value from this one.
        map(v1 => v + v1), // - adding offset from the second stream to the parent stream.
    )),
).subscribe(v => console.log(v));

теперь он считает от 0 до N даже есть лаги от бэкэнда.

ОБНОВЛЕНО

есть еще более простой способ, но проблема в том, что он не полагается вовремя, когда бэкэнд ответил и имеет собственный 1-секундный период.

pipe(
  bufferTime(1000), // <- collects for a second.
  scan((a, b) => b.length ? a + 1 : b[0], 0), // assumes or returns.
);
...