Обеспечить последовательное обновление состояния при использовании оператора сканирования RX Java - PullRequest
0 голосов
/ 24 февраля 2020

Я пытаюсь реализовать шаблон обновления избыточного состояния, используя RX Java

val subject=PublishSubject.create()
val subject1=PublishSubject.create()

// multiple threads posting 
// on subject and subject1 here. Concurrently 


subject.mergeWith(subject1)
       .scan(
             getInitState(),
             {state, event ->
               // state update here 
             }
         )
        .subscribe({state ->
          // use state here
        })

Как видите, я использую оператор scan для поддержания состояния.

Как я могу быть уверен, что обновления состояния происходят последовательно, даже когда несколько потоков генерируют события?

Есть ли какой-то механизм в операторе scan, который заставляет события стоять в какой-то очереди, ожидая, пока функция обновления текущего состояния завершится до sh?

Что я сделал:

Я успешно реализовал этот шаблон в среде Android. Это действительно легко, потому что, если вы всегда выполняете обновление состояния в

AndroidSchedulers.mainThread()

и делаете объект состояния неизменным, вы гарантированно получаете атомы c и последовательное обновление состояния. Но что произойдет, если у вас нет выделенного планировщика для обновлений состояния? Что если вы не подключены к Android?

Что я исследовал:

  • Я прочитал исходный код оператора scan и нет ожидающей очереди. Просто простое обновление состояния и эмиссия

  • Я также прочитал исходный код SerializedSubject. Там действительно есть очередь ожидания, которая сериализует выбросы. Но что произойдет, если у меня есть два предмета? Сериализация их обоих не означает, что они не мешают друг другу.

1 Ответ

2 голосов
/ 24 февраля 2020

Для принудительного выполнения в одном потоке вы можете явно создать планировщик одного потока для замены AndroidSchedulers.mainThread():

val singleThreadScheduler = Schedulers.single()

Даже если события отправляются в другие потоки, вы можете убедиться, что вы обрабатываете их только в вашем отдельном потоке, используя observeOn:

subject.mergeWith(subject1)
   .observeOn(singleThreadScheduler)
   .scan(
         getInitState(),
         {state, event ->
           // state update here 
         }
     )
    .subscribe({state ->
      // use state here
    })  

Разница между observeOn и subscribeOn может быть довольно запутанной, и регистрация идентификатора потока может быть полезна для проверки всего, что работает в потоке вы ожидаете.

http://reactivex.io/documentation/scheduler.html

...