Сингл с сыпучим? - PullRequest
       6

Сингл с сыпучим?

0 голосов
/ 23 января 2019

Попробуйте в rxJava2 Kotlin объединить Single с Flowable, но ничего не происходит: не понимает, что не так

  Flowable.create<Int>({ emmit ->

            loadNewListener = object :Listener {
                override fun onEmit(id: Int) {
                    emmit.onNext(id)
                }
            }
        }, BackpressureStrategy.LATEST)
                .debounce(500, TimeUnit.MILLISECONDS)
                .flatMapSingle {
                    loadNew(id = it.id)
                }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({ (data:Data) ->

                }, {

                    Timber.e("Failed load data ${it.message}")
                })

мой метод возвращает Single:

private fun loadNew(id: Int): Single<Data> {

            return when (pdfType) {

                CASE_0 -> {

                    Single.create<Data> { emmit ->

             service.get("data")
                    .enqueue(
                    object : Callback<Void> {
                         override fun onFailure(call: Call<Void>?, t: Throwable?) {
                            // failure
                        }

                          override fun onResponse(call: Call<Void>?, response:  Response<Void>?) {
                 emmit.onSuccess(it.data)
            }
                        }
                    }//single
                }//case_0


                CASE_1 -> 1Repository.loadsome1Rx(id = id).map { it.getData() }

                CASE_2 -> 2Repository.loadsom2LocalRx(id = id).map { it.getData() }

                else -> {
                    throw java.lang.RuntimeException("$this is not available type!")
                }
            }

Что не так im myкод?Нужно, чтобы Maby вызывал Single в Flowable subscribe () seppurate, как это?

Flowable.create<Int>({ emmit ->
        loadNewListener = object :Listener {
            override fun onEmit(id: Int) {
                emmit.onNext(id)
            }
        }
    }, BackpressureStrategy.LATEST)
            .debounce(500, TimeUnit.MILLISECONDS)

          .subscribe({
              loadNew(id = it.id)

          }, {
              Timber.e("")
          })

Этот код работает, но выглядит не так просто, как с помощью комбайна.

Ответы [ 2 ]

0 голосов
/ 23 января 2019

Как сказал @Stas Bondar в ответ ниже Этот простой пример, основанный на вашем коде, работает !!

Проблема была в loadNewListener.

Не запускается во времени и имеет нулевое значение при необходимости.Вызовите create Flowable для init ViewModel, но loadNewListener не успел создать, когда я вызываю его из фрагмента.

loadNewListener = object: Listener {...}

Becuseнужно некоторое время для выражения init rxJava!

И объединить переменные с простыми с помощью flatMapSingle потратили больше времени, чем просто вызов одного на текучих dubscrinbe!

Поэтому используйте временное поле:

     private var temp: Temp? = null

        fun load(id: Int) {

 loadNewListener.apply {

                when {
                    this != null -> load(id = id)

                    else -> userEmitPdfTemp = Temp(id = id)
                }
            }
        }




 Flowable.create<Data>({ emmit ->

                    userEmitPdfTemp?.let {id->

                        emmit.onNext(Data(id))
                        userEmitPdfTemp =null

                    }

                    loadNewListener = object :Listener {
                        override fun load(id: Int) {

                            emmit.onNext(Data(id))

                        }
                    }

                }
0 голосов
/ 23 января 2019

Этот простой пример, основанный на вашем коде, работает

var i = 0
fun foo() {
    Flowable.create<Int>({ emmit ->
        emmit.onNext(i)
        i++
    }, BackpressureStrategy.LATEST)
            .debounce(500, TimeUnit.MILLISECONDS)
            .flatMapSingle {
                Single.create<String> { emmit ->
                    emmit.onSuccess("onSuccess: $it")
                }
            }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({
                Log.i("RX", "Subscribe: $it")
            }, {
                it.printStackTrace()
            })
}

Проверка SingleEmitter.onSuccess() и SingleEmitter.onError() вызывается во всех случаях в when (pdfType)...

...