RxJava2 - подписка на PublishSubject - PullRequest
0 голосов
/ 24 октября 2019
 private val searchSubject =  PublishSubject.create<Boolean>()
private val compositeDisposable = CompositeDisposable()

fun textChange(){
    searSubject.onNext(true)
}

fun getSubject(){
    compositeDisposable += searchSubject
        .doOnNext {
            if (it) showLoading()
        }
        .switchMap { searchGithubReposObservable() }
        .subscribeWith(object : DisposableObserver<List<GithubRepo>>() {
            override fun onNext(t: List<GithubRepo>) {
                hideLoading()
                adapter.items = t
            }

            override fun onComplete() {
            }

            override fun onError(e: Throwable) {
                hideLoading()
            }
        })
}
  • searchGithubReposObservable - это функция, которая возвращает Observable<List<GithubRepo>>

. Я искал образец кода в github для изучения RxJava. Однако я не могу понять приведенный выше код.

Я знаю, что для получения данных от PublishSubject мне необходимо подписать его.

В приведенном выше коде я думал, что subscribeWith подписываетсяФункция searchGithubReposObservable () возвращает Observable, но я могу получить данные из PublishSubject при вызове textchange ().

Почему это возможно?

Ответы [ 2 ]

1 голос
/ 24 октября 2019

В начале вашей цепочки RX вы слушаете тему публикации.

compositeDisposable += searchSubject
    .doOnNext {
        if (it) showLoading()
    }

Каждый раз, когда вы вызываете метод textChange(), вы нажимаете searchSubject, который снова запускает цепочку RX, вызывая переключение карты.

0 голосов
/ 24 октября 2019

Да, возможно, вы можете получить данные, когда вызывается метод textchange (). Я реализовал этот тип функциональности, когда набирал текст. Я написал, пожалуйста, проверьте

 autocompletetextview.debounce(500L, TimeUnit.MILLISECONDS)
                    .distinctUntilChanged()
                    .filter { it.trim().isNotEmpty() || it.isEmpty() }
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .flatMap {
                        Observable.just(callapi here )
                    }
                    .subscribe({
                        it.subscribe({ serviceResponse ->
                            if (serviceResponse.meta.status == KeyUtils.HTTP_SUCCESS ||
                                    serviceResponse.meta.status == KeyUtils.STATUS_META_ERROR) {
                                setSuccessResponse(serviceResponse, true)
                            } else {
                                setSuccessResponse(serviceResponse, false)
                            }
                        }, { throwable ->
                            setErrorResponse(throwable)
                        }).collect()
...