Наблюдаемые операторы «что-то делают» через некоторое время - PullRequest
0 голосов
/ 27 сентября 2018

В цепочке наблюдаемых rxjs, как я могу сделать что-то с доступом к текущему значению наблюдаемой после того, как прошло определенное количество времени?По сути, я ищу что-то вроде оператора tap , но оно выполняется только в том случае, если прошло определенное количество времени, не видя значения из наблюдаемой.Так что на самом деле это как комбинация нажатия и тайм-аута.

Я представляю себе что-то вроде следующего

observable$.pipe(
  first(x => x > 5),
  tapAfterTime(2000, x => console.log(x)),
  map(x => x + 1)
).subscribe(...);

Это выдуманный пример, а функция "tapAfterTime" нереальный.Но основная идея заключается в том, что если после подписки прошло 2000 мс, и наблюдаемое не увидело значения больше 5, то сделайте функцию обратного вызова tapAfterTime для любого текущего значения наблюдаемого.Если бы мы увидели значение больше 5 до 2000 мс, обратный вызов tapAfterTime никогда не запустился бы, но функция map всегда работала бы так, как ожидалось.

Существует ли оператор для достижения этого или любой комбинации операторов?

Ответы [ 3 ]

0 голосов
/ 27 сентября 2018

Это может быть что-то вроде этого:

let cancel;
observable$.pipe(
  tap((x)=>clear=setTimeout(()=>console.log(x), 2000)),
   filter(x => x > 5),
   tap(x => clearTimeout(clear)),
   map(x => x + 1)
);
0 голосов
/ 28 сентября 2018

Может быть, это действительно слишком сложно, может быть, это стоит посмотреть.

Идея состоит в том, чтобы иметь 2 различные наблюдаемые, созданные для преобразования источника observable$ и затем в конечном итоге объединить.

first Observable, назовем его obsFilterAndMapped, это тот, где вы выполняете фильтрацию и отображение.

Второй Observable, назовем его obsTapDelay, это Observable, который запускает новый таймер с определенным задержка в любое время, когда первая Наблюдаемая, т.е. obsFilterAndMapped, испускает - после того, как delayTime пройден, вы выполняете действие tapAfterTime - если первая Наблюдаемая испускает новое значение перед delayTime передается, затем создается новый таймер.

Этот код реализует эту идею

const stop = new Subject<any>();
const obsShared = observable$.pipe(
    finalize(() => {
        console.log('STOP');
        stop.next();
        stop.complete()
    }),
    share()
);
const delayTime = 300;
const tapAfterTime = (value) => {
    console.log('tap with delay', value)
}; 

let valueEmitted;

const obsFilterAndMapped = obsShared.pipe(
    tap(val => valueEmitted = val),
    filter(i => i > 7),
    map(val => val + ' mapped')
);

const startTimer = merge(of('START'), obsFilterAndMapped);

const obsTapDelay = startTimer.pipe(
    switchMap(val => timer(delayTime).pipe(
        tap(() => tapAfterTime(valueEmitted)),
        switchMap(() => empty()),
    )),
    takeUntil(stop),
)

merge(obsFilterAndMapped, obsTapDelay)
.subscribe(console.log, null, () => console.log('completed'))

При таком подходе вы выполняете действие tapAfterTimeв любое время источник observable$ не излучает ничего дольше delayTime.Другими словами, это будет работать не только для первой эмиссии observable$, но и для всей ее жизни.

Вы можете протестировать такой код с помощью следующего ввода

const obs1 = interval(100).pipe(
    take(10),
);
const obs2 = timer(2000, 100).pipe(
    take(10),
    map(val => val + 200),
);
const observable$ = merge(obs1, obs2);

Еще немного поработавмы можем даже подумать, чтобы скрыть глобальную переменную valueEmitted внутри замыкания, но это увеличило бы сложность кода и, вероятно, не стоило.

0 голосов
/ 27 сентября 2018

Взгляните на

А возможно что-то подобное?

--

initiateTimer() {
    if (this.timerSub) {
        this.timerSub.unsubscribe();
    }

    this.timerSub = Rx.Observable.timer(2000)
        .take(1)
        .subscribe(this.showPopup.bind(this));
}
...