Могу ли я уведомить BehaviorProcessor из потока RxJava? - PullRequest
0 голосов
/ 16 января 2019

Я хотел бы получить ваш отзыв о коде ниже.Мне интересно, можно ли безопасно вызывать currentSession.onNext(result.session) из потока SessionManager.signIn.

Моя первая интуиция - сказать NO из-за проблем с многопоточностью и синхронизацией, то есть, основываясь на этом коде, яможет звонить currentSession.onNext(result.session) из разных тем.

Вот код, пожалуйста, дайте мне знать, что вы думаете!Спасибо

SessionManager, который является одноэлементным

@Singleton
class SessionManager @Inject constructor(
    private val sessionService: SessionService,
){

    val currentSession = BehaviorProcessor.create<Session>()

    fun signIn(login: String, password: String): Single<Boolean> =
        sessionService.signIn(login, password)
            .doOnNext(result -> 
                if (session is Success) {
                   currentSession.onNext(result.session)
                }
            ).map { result ->
                when (result) {
                    is Success -> true
                    else -> false
                }
            }
            .subscribeOn(Schedulers.io())
}

HomeView, который является случайным представлением, подписывающимся на поток входа SessionManager signIn

class HomeView(val context: Context) : View(context) {

        @Inject
        lateinit var sessionManager: SessionManager

        private val disposables = CompositeDisposable()

        override fun onAttachedToWindow() {
            super.onAttachedToWindow()

            disposables.add(sessionManager.signIn("username", "password")
                .distinctUntilChanged()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe { result ->
                    textView.text = if (result) "Success" else "Fail"
                })
        }

        override fun onDetachedFromWindow() {
            super.onDetachedFromWindow()
            disposables.clear()
        }
    }

Случайный вид, наблюдающий currentSessionот SessionManager

class RandomView(val context: Context) : View(context) {

        @Inject
        lateinit var sessionManager: SessionManager

        private val disposables = CompositeDisposable()

        override fun onAttachedToWindow() {
            super.onAttachedToWindow()

            disposables.add(sessionManager.currentSession
                .distinctUntilChanged()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe { session -> userTextView.text = session.userName })
        }

        override fun onDetachedFromWindow() {
            super.onDetachedFromWindow()
            disposables.clear()
        }
    }

1 Ответ

0 голосов
/ 16 января 2019

Документация BehaviorProcessor гласит:

Вызов onNext (Object), offer (Object), onError (Throwable) и onComplete () должен быть сериализован (вызван из одного потока или вызван непересекающимся образом из разных потоков через внешние средства сериализации). Метод FlowableProcessor.toSerialized (), доступный для всех FlowableProcessors, обеспечивает такую ​​сериализацию и также защищает от повторного входа (т. Е. Когда нижестоящий подписчик, потребляющий этот процессор, также хочет вызвать onNext (Object) на этом процессоре рекурсивно).

Итак, если вы определите это так:

val currentSession = BehaviorProcessor.create<Session>().toSerialized()

тогда вы можете спокойно звонить onNext из любого потока, это не вызовет проблем с синхронизацией.

Примечания:

Я согласен, что обновление процессора должно быть в doOnNext вместо map.

Я думаю, что было бы лучше использовать Completable вместо Single<Boolean> и использовать ошибки Rx, чтобы указать, что препятствовало входу. Вы также должны определить обработчики ошибок в subscribe методах.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...