Я пытаюсь реализовать шаблон обновления избыточного состояния, используя RX Java
val subject=PublishSubject.create()
val subject1=PublishSubject.create()
// multiple threads posting
// on subject and subject1 here. Concurrently
subject.mergeWith(subject1)
.scan(
getInitState(),
{state, event ->
// state update here
}
)
.subscribe({state ->
// use state here
})
Как видите, я использую оператор scan
для поддержания состояния.
Как я могу быть уверен, что обновления состояния происходят последовательно, даже когда несколько потоков генерируют события?
Есть ли какой-то механизм в операторе scan
, который заставляет события стоять в какой-то очереди, ожидая, пока функция обновления текущего состояния завершится до sh?
Что я сделал:
Я успешно реализовал этот шаблон в среде Android. Это действительно легко, потому что, если вы всегда выполняете обновление состояния в
AndroidSchedulers.mainThread()
и делаете объект состояния неизменным, вы гарантированно получаете атомы c и последовательное обновление состояния. Но что произойдет, если у вас нет выделенного планировщика для обновлений состояния? Что если вы не подключены к Android?
Что я исследовал:
Я прочитал исходный код оператора scan
и нет ожидающей очереди. Просто простое обновление состояния и эмиссия
Я также прочитал исходный код SerializedSubject. Там действительно есть очередь ожидания, которая сериализует выбросы. Но что произойдет, если у меня есть два предмета? Сериализация их обоих не означает, что они не мешают друг другу.