Я пытаюсь объединить 3 состояния потока моего приложения в 1. Цикл отключен <-> подключить <-> авторизован.Поэтому, когда приложение подключено только к серверу, оно будет пытаться отправлять аутентификацию каждые 3 секунды.Если он не отключен или уже не аутентифицирован.
Я использую таймер rxjs для отправки аутентификации в потоке, который должен быть объединен, и takeUntil, чтобы прекратить отправку аутентификации, если он был аутентифицирован.Но проблема заключается в том, что вместо завершения потока sendAuth объединенный auth $ прекращает отправлять новый ответ, даже несмотря на то, что до слияния auth $ продолжает выдавать ответ.Вот код:
this._response$ = fromEvent<string>(this._socket, "response")
.pipe(map(data => JSON.parse(data)));
const disconnect$ = fromEvent(this._socket, "disconnect");
const connect$ = fromEvent(this._socket, "connect");
const auth$ = this._response$.pipe(
// this stream still emitting even if takeUntil is declared on connect$.
filter(res => res.action === "authentication" && res.type === "success"),
);
status$ = merge(
disconnect$.pipe(
map(() => {
this._status = statusEnum.OFF;
return this._status;
})
),
connect$.pipe(
// will send auth each 3s after connected
// until it is authenticated or disconnected
switchMap(() => timer(0, 3000).pipe(
tap(() => console.log('Sending auth right after connect')),
map(() => this._sendAuth()),
// completing the status$ stream instead of switchMap
takeUntil(auth$),
)),
map(() => {
this._status = statusEnum.ON;
return this._status;
}),
),
auth$.pipe(
// this stream stop emitting if takeUntil is declared on connect$.
map(() => {
this._status = statusEnum.AUTHENTICATED;
return this._status;
})
),
);
EDIT : объединенный auth $ начинает выдавать значение снова после добавления оператора share в конвейер auth для объединения.
const auth$ = this._response$.pipe(
// this stream still emitting even if takeUntil is declared on connect$.
filter(res => res.action === "authentication" && res.type === "success"),
share(),
);