RXJS flatMap к повторяющимся наблюдаемым - PullRequest
1 голос
/ 09 ноября 2019

Я пытаюсь внедрить службу, которая обеспечивает наблюдаемость, если приложение подключено к моему серверу или нет, поэтому, когда браузер подключен к сети, мы пропингуем сервер с таймером. Вот код:

public get $connected(): Observable<boolean> {
    return this.hasInternetConnection
               .asObservable()
               .pipe(
                 distinctUntilChanged(),
                 flatMap((connected: boolean) => {
                   if (!connected) {
                     return of(connected);
                   } else {
                     return timer(5000)
                       .pipe(
                         map(() => {
                           var success = Math.random() > 0.5;
                           console.log('PING: ' + success);
                           return success;
                         })
                       );
                   }
                 })
               );
  }

hasInternetConnection - это просто объект BehaviorSubject, привязанный к событиям окна online и offline, таймер эмулирует эхо-запрос на мой сервер API.

Проблема в том, чтомоя подписка $connected ловит только первое значение из наблюдаемого таймера, а затем не работает. После hasInternetConnection тема меняется на false и обратно на true, моя подписка снова получает первое значение, а затем ничего. Вот что я вижу в консоли:

PING: true
subscription tap
PING: true
PING: false
PING: true
...

Как я могу это исправить? Спасибо!

1 Ответ

0 голосов
/ 15 ноября 2019

Полное решение:

  private hasInternetConnection: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(navigator.onLine);
  private connectedSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(true);
  private recheckConnectionSubject: Subject<void> = new Subject<void>();

  constructor(
    private readonly http: HttpClient,
  ) {
    fromEvent(window, 'online')
      .pipe(takeUntil(this.destroyed))
      .subscribe(() => {
        this.hasInternetConnection.next(true);
      });
    fromEvent(window, 'offline')
      .pipe(takeUntil(this.destroyed))
      .subscribe(() => {
        this.hasInternetConnection.next(false);
      });
    merge(
      this.hasInternetConnection,
      this.recheckConnectionSubject,
    )
      .pipe(
        mapTo(this.hasInternetConnection.value),
        switchMap((connected: boolean) => {
          if (!connected) {
            return of(connected);
          } else {
            return timer(0, 30000)
              .pipe(
                mergeMapTo(this.http.get(`${environment.apiRoot}/ping`, { responseType: 'text' })
                               .pipe(
                                 map((res) => {
                                   return true;
                                 }),
                                 catchError(() => {
                                   return of(false);
                                 })
                               )
                ),
              );
          }
        })
      )
      .subscribe(this.connectedSubject);
  }

  public get $connected(): Observable<boolean> {
    return this.connectedSubject.asObservable()
               .pipe(
                 distinctUntilChanged(),
               );
  }

  public resetTimer(): void {
      this.recheckConnectionSubject.next();
  }
...