Проблема возникает из-за того, что storage.session.getItem
испускает , как только оно подписано , а merge(a$, b$)
подписывается на a$
, прежде чем оно подписывается на b$
, поэтому a$
получает событие, но Время подписки b$
пришло слишком поздно. Эта проблема хорошо известна и называется glitch (в реактивном программировании) и обычно возникает, когда существует ромбовидный потоковый граф , что в точности соответствует вашему случаю.
У меня есть два сообщения в блоге о глюках, которые могут дать вам больше контекста: Учебник по планировщикам RxJS и Глюки Rx на самом деле не проблема . В нем упоминается RxJS, но xstream довольно близок к RxJS с точки зрения реализации. Разница между xstream и RxJS заключается в том, что потоки xstream всегда являются многоадресными («общими»), а RxJS имеет много типов планировщиков, но xstream имеет только один. Планировщик по умолчанию в RxJS работает так же, как xstream.
Решение состоит в том, чтобы применить .remember()
до достижения бриллианта. Это связано с тем, что передаваемое значение необходимо кэшировать для других потребителей этого потока. .remember()
просто преобразует Stream в MemoryStream. Я думаю, что исходный поток изначально был MemoryStream, и отображение MemoryStream создает другие MemoryStreams, но filter
- это оператор, который нарушает это. filter
всегда возвращает поток, и причина этого в том, что MemoryStreams всегда должен иметь текущее значение, но filter
может удалить значений, поэтому возможно, что отфильтрованный поток не имеют любое текущее значение.
Как автор Cycle.js, я полагаю, что способ создания cycjs / storage не самый лучший, и я думаю, что мы найдем способы разработки этих API, чтобы путаница с MemoryStream и Stream была сведена к минимуму. Но на данный момент важно понять разницу между этими двумя понятиями и спланировать свое приложение так, чтобы избегать алмазов (и сбоев), или использовать .remember()
в нужных местах.