Таймер со слиянием прекращает испускать значение, даже если родительский исходный поток все еще испускает новые - PullRequest
0 голосов
/ 12 декабря 2018

Я пытаюсь объединить 3 состояния потока моего приложения в 1. Цикл отключен <-> подключить <-> авторизован.Поэтому, когда приложение подключено только к серверу, оно будет пытаться отправлять аутентификацию каждые 3 секунды.Если он не отключен или уже не аутентифицирован.

Я использую таймер rxjs для отправки аутентификации в потоке, который должен быть объединен, и takeUntil, чтобы прекратить отправку аутентификации, если он был аутентифицирован.Но проблема заключается в том, что вместо завершения потока sendAuth объединенный auth $ прекращает отправлять новый ответ, даже несмотря на то, что до слияния auth $ продолжает выдавать ответ.Вот код:

this._response$ = fromEvent<string>(this._socket, "response")
  .pipe(map(data => JSON.parse(data)));

const disconnect$ = fromEvent(this._socket, "disconnect");
const connect$ = fromEvent(this._socket, "connect");
const auth$ = this._response$.pipe(
  // this stream still emitting even if takeUntil is declared on connect$.
  filter(res => res.action === "authentication" && res.type === "success"),
);

status$ = merge(
  disconnect$.pipe(
    map(() => {
      this._status = statusEnum.OFF;
      return this._status;
    })
  ),
  connect$.pipe(
    // will send auth each 3s after connected
    // until it is authenticated or disconnected
    switchMap(() => timer(0, 3000).pipe(
      tap(() => console.log('Sending auth right after connect')),
      map(() => this._sendAuth()),
      // completing the status$ stream instead of switchMap
      takeUntil(auth$),
    )),
    map(() => {
      this._status = statusEnum.ON;
      return this._status;
    }),
  ),
  auth$.pipe(
    // this stream stop emitting if takeUntil is declared on connect$.
    map(() => {
      this._status = statusEnum.AUTHENTICATED;
      return this._status;
    })
  ),
);

EDIT : объединенный auth $ начинает выдавать значение снова после добавления оператора share в конвейер auth для объединения.

const auth$ = this._response$.pipe(
  // this stream still emitting even if takeUntil is declared on connect$.
  filter(res => res.action === "authentication" && res.type === "success"),
  share(),
);

1 Ответ

0 голосов
/ 13 декабря 2018
// assume a isConnect stream which return a boolean indicate connection status
let isConnect$ = new BehaviorSubject(false);

// assume a behavior subject to store a auth status
let isAuth$ = new BehaviorSubject(false);

disconnect$.subscribe(() => this.isConnect$.next(false));

connect$.subscribe(() => this.isConnect$.next(true));

// combine isConnect and isAuth together
let status$ = combineLatest(isConnect$, isAuth$).pipe(
  map(() => (isAuth ? statusEnum.AUTHENTICATED : isConnect ? statusEnum.ON : statusEnum.OFF)),
);

// a timer for sending auth every 3 secs and only send if status is statusEnum.ON;
let auth$ = timer(0, 3000)
  .pipe(
    switchMapTo(status$),
    filter(status => status === statusEnum.ON),
  )
  .subscribe(() => this.sendAuth());

// in this.sendAuth, after successfully get auth, you have to make isAuth$.next(true);
...