Rxjs: takeUntil () неправильное поведение после того, как наблюдаемое было завершено и запущено снова? - PullRequest
0 голосов
/ 27 июня 2018

У меня есть следующий код:

 const reconnectorAfterPingPongFailed$ = Observable.of('timeout').delay(5000).takeUntil(this.serviceAlive$);

        reconnectorAfterPingPongFailed$
          .race(this.pingPong$)
          .take(1)
          .repeat()
          .takeUntil(this.serviceAlive$)
          .do(() => console.log("[ping-pong]:","repeated"))
          .subscribe((data: any) => {
// some code
});

что я ожидаю от этого кода?

  1. Каждые N секунд мы получаем от сервера сообщение "pong", которое использовалось для создания этого .pingPong $ .next ("pong")
  2. Это относится к race (), где, если есть pong, мы получаем команду «pong» в подписке или «timeout», если «pong» не существует для задержки более 5000 секунд (событие приходит от reconnectorAfterPingPongFailed$)
  3. Я хотел бы обработать подписку, только если мой сервис работает, скажем, пока я вхожу в систему.
  4. Поэтому, когда я выхожу из системы, я делаю .next() и .complete():
  5. При первом выходе из системы я не вижу сообщения console.log от .do()
  6. Когда я снова захожу, эта тема воссоздается снова. serviceAlive$: Subject<void> = new Subject<void>();
  7. И когда я снова вышел из системы, кажется, что этот код .do(() => console.log("[ping-pong]:","repeated")) все еще работает
...