Перезапустите таймер на интервале rxjs - PullRequest
2 голосов
/ 17 апреля 2019

Я создал класс, который устанавливает паузу rxjs, наблюдаемую на интервале:

export class RepeatingServiceCall<T> {
  private paused = false;
  private observable: Observable<T>;
  constructor(serviceCall: () => Observable<T>, delay: number) {
    this.observable = interval(delay).pipe(flatMap(() => (!this.paused ? serviceCall() : NEVER)));
  }
  setPaused(paused: boolean) {
    this.paused = paused;
  }
  getObservable() {
    return observable;
  }
}

Кажется, это работает нормально, но проблема, которую я пытаюсь решить, заключается в том, что я хочу, чтобы таймер сбрасывался при отсутствии паузы,Итак, допустим, что интервал времени составляет 10 секунд и 5 секунд после того, как последний раз испускался интервал, setPaused(false) вызывается.В этом случае я хочу, чтобы он испустил немедленно, а затем перезапустить таймер.Было бы легко добавить что-то подобное?

Ответы [ 5 ]

3 голосов
/ 17 апреля 2019

Если вы используете timer вместо interval и установите начальную задержку на 0, то ваш интервал сработает немедленно.

Вы можете использовать оператор takeUntil, чтобы запретить выполнение интервала всегда, и оператор repeatWhen, чтобы перезапустить его в любое время:

import { Observable, Subject, timer } from 'rxjs';
import { repeatWhen, switchMap, takeUntil } from 'rxjs/operators';

export class RepeatingServiceCall<T> {
  readonly observable$: Observable<T>;
  private readonly _stop = new Subject<void>();
  private readonly _start = new Subject<void>();

  constructor(serviceCall: () => Observable<T>, delay: number) {
    this.observable$ = timer(0, delay)
      .pipe(
        switchMap(() => serviceCall()),
        takeUntil(this._stop),
        repeatWhen(() => this._start)
      );
  }
  start(): void {
    this._start.next();
  }
  stop(): void {
    this._stop.next();
  }
}

Вот рабочий пример StackBlitz .

P.S .: Методы получения и установки работают по-разному. Таким образом, вам не нужна классическая концепция получения, вы можете просто сделать атрибут public и readonly.

1 голос
/ 17 апреля 2019

Вы можете описать поведение, которое вы описываете, с помощью следующего фрагмента:

const delay = 1000;

const playing = new BehaviorSubject(false);

const observable = playing.pipe(
  switchMap(e => !!e ? interval(delay).pipe(startWith('start')) : never())
);

observable.subscribe(e => console.log(e));

// play:
playing.next(true);

// pause:
playing.next(false);
  • Когда playing Observable испускает true, оператор switchMap вернет новый interval Observable.
  • Используйте оператор startWith для немедленного создания события, когда он не используется.
  • Если вы хотите, чтобы интервал запускался автоматически при подписке на наблюдаемое, просто инициализируйте BehaviorSubject с помощьюtrue.

Пример StackBlitz

0 голосов
/ 10 мая 2019

проверьте этот код

/**
* it is a simple timer created by via rxjs
* @author KentWood
* email minzojian@hotmail.com
*/
function rxjs_timer(interval, times, tickerCallback, doneCallback, startDelay) {
    this.pause = function () {
        this.paused = true;
    }
    this.resume = function () {
        this.paused = false;
    }
    this.stop = function () {
        if (this.obs) {
            this.obs.complete();
            this.obs.unsubscribe();
        }
        this.obs = null;

    }
    this.start = function (interval, times, tickerCallback, doneCallback, startDelay) {
        this.startDelay = startDelay || 0;
        this.interval = interval || 1000;
        this.times = times || Number.MAX_VALUE;
        this.currentTime = 0;
        this.stop();

        rxjs.Observable.create((obs) => {
            this.obs = obs;
            let p = rxjs.timer(this.startDelay, this.interval).pipe(
                rxjs.operators.filter(() => (!this.paused)), 
              rxjs.operators.tap(() => {
                    if (this.currentTime++ >= this.times) {
                        this.stop();
                    }
                }),
              rxjs.operators.map(()=>(this.currentTime-1))
            );
            let sub = p.subscribe(val => obs.next(val), err => obs.error(err), () => obs
                .complete());
            return sub;
        }).subscribe(tickerCallback, null, doneCallback);
    }
    this.start(interval, times, tickerCallback, doneCallback, startDelay);
}
/////////////test/////////////
var mytimer = new rxjs_timer(
1000/*interval*/, 
10 /*times*/, 
(v) => {logout(`time:${v}`)}/*tick callback*/, 
() => {logout('done')}/*complete callback*/, 
2000/*start delay*/);

//call mytimer.pause() 
//call mytimer.resume() 
//call mytimer.stop() 




function logout(str){
  document.getElementById('log').insertAdjacentHTML( 'afterbegin',`<p>${str}</p>`)
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.1/rxjs.umd.js"></script>

<button onclick="mytimer.pause()"> pause</button>
<button onclick="mytimer.resume()"> resume</button>
<button onclick="mytimer.stop()"> stop</button>
<div id='log'></div>
0 голосов
/ 17 апреля 2019

Еще один подход с switchMap:

const { fromEvent, timer } = rxjs;
const { takeUntil, switchMap, startWith } = rxjs.operators;

const start$ = fromEvent(document.getElementById('start'), 'click');
const stop$ = fromEvent(document.getElementById('stop'), 'click');


start$.pipe(
  startWith(void 0), // trigger emission at launch
  switchMap(() => timer(0, 1000).pipe(
    takeUntil(stop$)
  ))
).subscribe(console.log);
<script src="https://unpkg.com/rxjs@6.4.0/bundles/rxjs.umd.min.js"></script>

<button id="start">start</button>
<button id="stop">stop</button>

И проще: merge s запускает и останавливает Observables, чтобы отключить их:

const { fromEvent, merge, timer, NEVER } = rxjs;
const { distinctUntilChanged, switchMap, mapTo, startWith } = rxjs.operators;

const start$ = fromEvent(document.getElementById('start'), 'click');
const stop$ = fromEvent(document.getElementById('stop'), 'click');


merge(
  start$.pipe(mapTo(true), startWith(true)),
  stop$.pipe(mapTo(false))
).pipe(
  distinctUntilChanged(),
  switchMap(paused => paused ? timer(0, 1000) : NEVER)
)
.subscribe(console.log);
<script src="https://unpkg.com/rxjs@6.4.0/bundles/rxjs.umd.min.js"></script>

<button id="start">start</button>
<button id="stop">stop</button>

И еще один, еще более странный подход, использующий repeat():

const { fromEvent, timer } = rxjs;
const { take, concatMap, takeUntil, repeat } = rxjs.operators;

const start$ = fromEvent(document.getElementById('start'), 'click');
const stop$ = fromEvent(document.getElementById('stop'), 'click');


start$.pipe(
  take(1),
  concatMap(()=>timer(0, 1000)),
  takeUntil(stop$),
  repeat()
).subscribe(console.log);
<script src="https://unpkg.com/rxjs@6.4.0/bundles/rxjs.umd.min.js"></script>

<button id="start">start</button>
<button id="stop">stop</button>

Просто хотел присоединиться к этой вечеринке:)

0 голосов
/ 17 апреля 2019

Вы можете отказаться от старого таймера при запуске и запустить новый при старте.

const { interval, Subject, fromEvent } = rxjs;
const { takeUntil } = rxjs.operators;

let timer$;

const pause = new Subject();

const obs$ = new Subject();

obs$.subscribe(_ => { console.log('Timer fired') });

function start() {
  timer$ = interval(1000);
  timer$.pipe(takeUntil(pause)).subscribe(_ => { obs$.next(); });
}

function stop() {
  pause.next();
  timer$ = undefined;
}

fromEvent(document.getElementById('toggle'), 'click').subscribe(() => {
  if (timer$) {
    stop();
  } else {
    start();
  }
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>
<button id="toggle">Start/Stop</button>
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...