Почему я должен подписаться на основной поток? - PullRequest
0 голосов
/ 22 сентября 2018

пытаясь выяснить кое-что из этого ресурса: https://www.raywenderlich.com/384-reactive-programming-with-rxandroid-in-kotlin-an-introduction

Я застрял с вопросом: почему я должен вызывать subscribeOn () в главном потоке вместо Schedulers.io ()?

Когда я делаю подписку таким образом, мое приложение зависает на пару секунд, и я сбрасываю кадры.

searchTextObservable
                .subscribeOn(Schedulers.io())
                .map { cheeseSearchEngine.search(it) }
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe {
                    showResult(it)
                }

Dropping frames screen

А потом я подписываюсь в главном потоке и наблюдаю за этим в Schedulers.io () (я тоже не понимаю, зачем мне это делать) приложение вообще не зависает.

searchTextObservable
                .subscribeOn(AndroidSchedulers.mainThread())
                .observeOn(Schedulers.io())
                .map { cheeseSearchEngine.search(it) }
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe {
                    showResult(it)
                }

Кто-нибудь может объяснить, почему это так работает?

РЕДАКТИРОВАТЬ

// 1
private fun createTextChangeObservable(): Observable<String> {
  // 2
  val textChangeObservable = Observable.create<String> { emitter ->
    // 3
    val textWatcher = object : TextWatcher {

      override fun afterTextChanged(s: Editable?) = Unit

      override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) = Unit

      // 4
      override fun onTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) {
        s?.toString()?.let { emitter.onNext(it) }
      }

    }

    // 5
    queryEditText.addTextChangedListener(textWatcher)

    // 6
    emitter.setCancellable {
      queryEditText.removeTextChangedListener(textWatcher)
    }
  }

  // 7
  return textChangeObservable
}

Ответы [ 3 ]

0 голосов
/ 27 сентября 2018

subscribeOn vs. наблюдаем

Подписаться будет вызывать метод создания Observable для данного планировщика.Неважно, сколько раз вы используете подписку.Первая подписка на source-observable (первая в цепочке) всегда побеждает.

наблюдать за переключением потока от оператора к оператору.Когда восходящий поток отправляет значение в потоке X, оно будет переключено на поток Y из заданного планировщика в наблюдаемом-операторе.Теперь все, что ниже, чем понаблюдать, будет обработано в потоке Y.

Наилучшее предположение на примере 1: Использование подписки вызовет Observable # create для Schedulers # io.Все в create-lambda будет вызываться в этом потоке из Schedulers # io.Обратный вызов слушателя (onTextChanged) может фактически происходить в другом потоке.В данном случае это UI-Thread, потому что это какой-то элемент UI.Теперь onNext будет вызываться из UI-Thread (emitter.onNext (it)).Значение будет передано оператору #map в потоке пользовательского интерфейса (.map {cheeseSearchEngine.search (it)}), а поиск cheeseSearchEngine # заблокирует поток в пользовательском интерфейсе.

Example2: используется в качестве первого оператора ".subscribeOn (AndroidSchedulers.mainThread ())».Это фактически не имеет никакого эффекта, потому что вы уже в UI-Thread.В этом случае create-lambda будет вызываться из AndroidSchedulers # mainThread.OnNext также будет передаваться в потоке пользовательского интерфейса, как раз в примере 1, потому что пользовательский интерфейс инициирует событие onTextChanged-Event.Затем это значение будет введено с помощью наблюдающего (Schedulers.io ()).Все из точки наблюдения будет выполнено в Schedulers # io-Thread.Это, в свою очередь, не будет блокировать пользовательский интерфейс, когда map выполняет какой-либо HTTP-запрос (или какой-то длительный ввод-вывод).После того, как карта закончена и отправляет следующее значение в нисходящем направлении, следующий наблюдающий (AndroidSchedulers.mainThread ()) переключится обратно на поток пользовательского интерфейса.Следовательно, теперь вы можете изменить UI в подписной лямбде, потому что вы находитесь в UI-Thread.В заключение можно отметить, что первая подписка в Примере2 может быть опущена, если не имеет значения, из какого потока происходит регистрация слушателя (слушатель-рег, вероятно, должен быть потокобезопасным).

Резюме: использование подписки будет тольковызовите создание лямбды на заданном Scheduler-Thread.Обратный вызов от зарегистрированного слушателя при создании может произойти в другом потоке.Вот почему Example1 будет блокировать UI-поток, а Example2 не будет.

0 голосов
/ 27 сентября 2018

Я думаю, что код немного вводит в заблуждение из-за оператора .map, который фактически используется для выполнения дорогостоящей операции (поиска).Лучший способ сделать это - обернуть код с помощью fromCallable и преобразовать его в асинхронный вызов с помощью subscribeOn.Примерно так:

searchTextObservable
            // Means start a new async search every time text is changed
            .flatMapSingle { Single
                .fromCallable { cheeseSearchEngine.search(it) }
                // This makes sure search is running on IO thread
                // This way expensive operation is done off the main thread, which eliminates the freeze
                .subscribeOn(Schedulers.io()) }
            // This makes sure that results will be handled on main thread
            // Important because you can only access Android Widgets from the main thread
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe {
                showResult(it)
            }

Объяснения в Кодексе.Я думаю, что цель теперь намного яснее.Подробности смотрите в ответе HansWursrt

0 голосов
/ 23 сентября 2018

Это красота Rx.Простое переключение потоков.По сути, в Rx мы можем переключаться между различными потоками, просто вызывая subscribeOn () или ObserveOn ().Разница между этими двумя заключается в том, что при вызове subscribeOn (Thread1) задача (в вашем примере - cheeseSearchEngine.search (it)) выполняется в Thread1.

Однако, когда вы вызываете наблюдаем (Thread2), результат выполненной задачи передается в Thread2.Это означает, что результат будет обработан в Thread2.(В вашем примере showResult будет вызываться в Thread2)

Таким образом, когда вы вызываете subscribeOn (Schedulers.io ()), задача выполняется в потоке ввода-вывода.Как только результат будет готов, он будет передан в поток основного пользовательского интерфейса при вызове наблюдать за (AndroidSchedulers.mainThread ()).

Когда все сделано наоборот, вы в основном пытаетесь выполнить задачу в потоке пользовательского интерфейса, а не с помощью ввода-вывода.фоновый поток.При таком подходе, если вы попытаетесь обновить любой элемент пользовательского интерфейса, будет сгенерировано исключение о том, что «элементы пользовательского интерфейса не могут быть доступны из фонового потока (CalledFromWrongThreadException: только исходный поток, создавший иерархию представления, может касаться его представлений)».

Надеюсь, я отвечу на ваш вопрос.Удачного кодирования в Rx.

...