Чтобы сериализовать доступ к demoDao.get()
, есть несколько способов добиться этого, но старайтесь не использовать блокировку, чтобы сделать это, поскольку это может заполнить реактивный поток тупиками для стартеров (как вы узнали).
Если вы действительно хотите использовать блокировку, вы должны убедиться, что блокировка не передается для потокового сигнала, такого как передача в нисходящий поток или запрос в восходящий.В этой ситуации вы можете использовать блокировку (недолговечную).
Один из подходов - объединить действия двух потоков в один (скажем, merge
) и выполнить demoDao
для этого потока.
Другой подход заключается в создании PublisheSubject
с использованием PublishSubject.create().serialized()
, который выполняет demoDao.get()
и затем подписывается на него только один раз.Тогда два упомянутых вами источника могут .doOnNext(x -> subject.onNext())
.Зависит от того, должен ли каждый источник знать о сбое независимо или допустимо, чтобы подписка PublishSubject
была единственным местом, где сообщалось о сбое.