Как реализовать цепную блокировку в RxJava - PullRequest
0 голосов
/ 05 июня 2018

Я хочу реализовать блокировку в своем приложении, чтобы позволить только одному фрагменту цепочки выполняться одновременно, а любой другой ждать друг друга.

Например:

    val demoDao = DemoDao() // data that must be accessed only by one rx-chain fragment at one time

    Observable.range(0, 150)
            .subscribeOn(Schedulers.io())
            .flatMapCompletable {
                dataLockManager.lock("action") { // fragment-start
                    demoDao.get()
                        .flatMapCompletable { data ->
                            demoDao.set(...)
                        }
                }                                // fragment-end
            }
            .subscribe()

    Observable.range(0, 100)
            .subscribeOn(Schedulers.io())
            .flatMapCompletable {
                dataLockManager.lock("action") { // fragment-start
                    demoDao.get()
                            .flatMapCompletable { data ->
                                demoDao.set(...)
                            }
                }                                // fragment-end
            }
            .subscribe()

Я пыталсяреализовать его через пользовательский Completable.create с CountDownLatch, но это может привести к тупиковой ситуации.

И на этом я отстой.Что вы можете мне порекомендовать?

Ответы [ 2 ]

0 голосов
/ 07 июня 2018

в асинхронном мире использование замков настоятельно не рекомендуется.Вместо этого блокировка моделируется последовательным выполнением актера или последовательного исполнителя.В свою очередь, актер может моделироваться Obserever, а серийный исполнитель - Schedulers.single(), хотя более опытные программисты RxJava могут дать больше советов.

0 голосов
/ 05 июня 2018

Чтобы сериализовать доступ к demoDao.get(), есть несколько способов добиться этого, но старайтесь не использовать блокировку, чтобы сделать это, поскольку это может заполнить реактивный поток тупиками для стартеров (как вы узнали).

Если вы действительно хотите использовать блокировку, вы должны убедиться, что блокировка не передается для потокового сигнала, такого как передача в нисходящий поток или запрос в восходящий.В этой ситуации вы можете использовать блокировку (недолговечную).

Один из подходов - объединить действия двух потоков в один (скажем, merge) и выполнить demoDao для этого потока.

Другой подход заключается в создании PublisheSubject с использованием PublishSubject.create().serialized(), который выполняет demoDao.get() и затем подписывается на него только один раз.Тогда два упомянутых вами источника могут .doOnNext(x -> subject.onNext()).Зависит от того, должен ли каждый источник знать о сбое независимо или допустимо, чтобы подписка PublishSubject была единственным местом, где сообщалось о сбое.

...