Как использовать RXJS для обновления устаревших, но активных данных - PullRequest
0 голосов
/ 04 марта 2019

У меня есть массив наблюдаемых в качестве хранилища данных, и я хочу убедиться, что горячие не устареют.Например, если нет подписчиков на наблюдаемое, мне не нужно его обновлять, но если есть подписчики, то без обновления не должно пройти более 5 минут.

тайм-аут , кажется, то, что я хочу, но я не хочу, чтобы он выдавал ошибку, я просто хочу запустить код, когда он истекает (аналогично tap). timeoutwith еще ближе, потому что я могу заменить его на наблюдаемое.Однако это не кажется «правильным», и после одного тайм-аута наблюдаемая замена не будет иметь присоединенного оператора timeoutWidth.

function getData(index){
    return database[index].pipe(timeoutWith(5*60*1000, database[index]))
}

Один из подходов, о котором я только что подумал, - это добавить рекурсив addTimeout метод, но это кажется хакерским:

private addTimeout(obs:Observable<any>, time:number):Observable<any>{
    return obs.pipe(timeoutWith(time,this.addTimeout(obs,time)));
  }

РЕДАКТИРОВАТЬ, добавляя код с использованием тайм-аута / повторных попыток:

const staleDataTime = 5*60*1000; // 5 minutes


// bhs makes it so you can update the value manually whenever you want
let bhs = new BehaviorSubject<Summary>(initialSummary);

// This will reset bhs after the timeout
bhs.pipe(timeout(staleDataTime)).subscribe(success=>null,err=>bhs.next(null));


let obs = bhs.pipe(
   switchMap(val=>val?of(val):this.retreiveSummary(filter)),
   timeout(staleDataTime),
   retry(),
   distinctUntilChanged((a:Summary,b:Summary) => a.count==b.count && a.sum == b.sum),      
   share()
);

obs.subscribe(val=>console.log("obs val: ", val));

Ответы [ 3 ]

0 голосов
/ 04 марта 2019

Для этого есть RX оператор

obs.pipe(
    switchMap(()=>timer(5*60*1000).pipe(
       tap(_=>..dosomething),
       repeat())
  )
)
0 голосов
/ 04 марта 2019

Вам даже не нужен пользовательский оператор для этого, потому что вы можете использовать timeout() и retry():

return database[index].pipe(
  timeoutWith(5 * 60 * 1000),
  retry(),
);

retry() повторно подпишется на свой источник, если он выдаст ошибку.Тем не менее, все это ожидает, что database[index] обновляется всегда, когда вы подписываетесь на него.

0 голосов
/ 04 марта 2019

Я понял, что это не так уж сложно сделать с пользовательским оператором

import { pipe } from 'rxjs';
import { tap } from 'rxjs/operators';


export const tapTimeout = (onTimeout:(()=>any), delay:number) => {
    let timeout = window.setTimeout(onTimeout, delay);
    return pipe(tap(x=> {
        window.clearTimeout(timeout);
        timeout = window.setTimeout(onTimeout,delay);
    }));
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...