import { combineLatest } from 'rxjs';
const observable_1 = get_first_observable();
const observable_2 = get_second_observable();
console.log('first log', observable_1, observable_2);
observable_1.subscribe(e => console.log('second log', e));
observable_2.subscribe(e => console.log('third log', e));
const combined = combineLatest(observable_1, observable_2);
console.log('fourth log', combined);
combined.subscribe(e => console.log('fifth log', e)); // throws TypeError warning, doesn't work
У меня есть часть кода, которая вела себя так же, как и выше, она работала нормально до вчерашнего дня, когда я внесла некоторые изменения в то, что мне показалось несвязанным кодом.
first log
подтверждает, что обе эти переменные действительно Observables
.
second log
и third log
подтверждают, что каждый из наблюдаемых излучает хотя бы одно значение.
fourth log
подтверждает, что combineLatest()
возвращает тип Observable
.
fifth log
никогда не срабатывает, и комментирование этой строки удаляет предупреждение. Везде, где я пытаюсь сделать combined.subscribe()
, выдается следующее предупреждение TypeError.
TypeError: You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.
Что здесь происходит ?? Я перепробовал все, что могу придумать, но безрезультатно. Мне кажется, что если я передаю две действительные наблюдаемые в combineLatest()
, и каждый из них испускает значение, то он должен работать как положено.
По запросу источники двух наблюдаемых являются следующими:
import { BehaviorSubject } from 'rxjs';
import { distinctUntilChanged, map } from 'rxjs/operators';
const get_first_observable = () => {
const subject = new BehaviorSubject<string>(null);
// I am using subject.next(...) elsewhere
return subject.asObservable().pipe(distinctUntilChanged());
};
const get_second_observable = () => {
// where store is a redux Store
const store_subject = new BehaviorSubject(store.getState());
store.subscribe(() => {
store_subject.next(store.getState());
});
const stream = store_subject.asObservable();
const my_observable = stream.pipe(
map(state => {
return format_data(state) || [];
})
);
return my_observable;
};
Хорошо, новое обновление.
Это действительно странно для меня, и я совсем этого не понимаю, но я добавил строку кода ниже в get_second_observable()
, прежде чем он вернет my_observable
, и затем волшебным образом весь код работает. Теперь, если я прокомментирую эту строку, она больше не будет работать ... Как это имеет смысл? Я не сохраняю новую наблюдаемую переменную или не использую ее вообще. Похоже, что их объединение заставляет его работать по цепочке.
combineLatest(my_observable, of(true));