Поскольку комментарий Мартина предлагает использовать объединение и задайте начальное значение для каждой части.Предполагая, что у вас есть myUser$
и myUserWebsocket$
наблюдаемые, вы можете использовать следующий код для их объединения.Сначала добавьте начальное значение и сохраните наблюдаемые в новых.Объедините их с combineLatest
.Вы должны подписаться на un cold combined$
observable.
UPDATE
Другой подход - использование merge
и объединение его с withLatestFrom
.
// Using RxJS v6+
import { merge } from 'rxjs';
import { startWith, map, withLatestFrom } from 'rxjs/operators';
// Your allready defined observables named e.g. myUser$ and myUserWebsocket$
// Add a starting value, since if user fires, websocket has no value yet.
const user$ = myUser$.pipe(
startWith(undefined)
)
const userWebsocket$ = myUserWebsocket$.pipe(
startWith(undefined)
)
const combined$ = merge(
user$,
userWebsocket$
).pipe(
withLatestFrom(user$),
withLatestFrom(userWebsocket$),
map(([[merge, latestUser], latestWebsocket]) => {
if (merge.user_id) { // merge is typeof userWebsocket, make sure user does not have user_id property
if (latestUser && merge.user_id === latestUser.id) {
return [latestUser, merge];
} else {
return [undefined, merge];
}
} else if (merge.id && !merge.user_id) { // merge is typeof user
if (latestWebsocket && merge.id === latestWebsocket.user_id) {
return [merge, latestWebsocket];
} else {
return [merge, undefined];
}
}
})
);
combined$.subscribe([user, websocket] => {
console.log(user, websocket);
});