Получение элементов в onNext () после вызова dispose () в RxJava - PullRequest
0 голосов
/ 25 февраля 2019

Это то, что я прочитал о dispose() здесь :

В двух словах, когда Disposable (реализуемый TestObserver) удаляется, Observer (также TestObserver) больше не будет получать значения из Observable.

Вот мой код:

private fun createObservableWithDisposable() {
    Observable
            .create { e: ObservableEmitter<String> ->
                val worker = Schedulers.io().createWorker()
                e.setDisposable(worker)
                worker.schedule {
                    for (i in 1..5) {
                        if (i == 3) {
                            worker.dispose()
                            // https://medium.com/@vanniktech/rxjava-2-disposable-under-the-hood-f842d2373e64
                            // After calling dispose(), the subscriber no longer receives items passed in OnNext().
                            // But it doesn't work in my code
                        }
                        e.onNext("Event $i on thread ${Thread.currentThread().name}")
                    }
                }
            }
            .subscribe(
                    { s ->
                        Log.d(TAG, "createObservableWithDisposable onNext msg=$s")
                    },
                    { e ->
                        Log.d(TAG, "createObservableWithDisposable", e)
                    },
                    {
                        Log.d(TAG, "createObservableWithDisposable onComplete")
                    }
            )
}

И вот что я вижу в Logcat:

2019-02-25 08:10:53.414 12071-12101/ru.sample D/RxJavaSamples: createObservableWithDisposable onNext msg=Event 1 on thread RxCachedThreadScheduler-1
2019-02-25 08:10:53.414 12071-12101/ru.sample D/RxJavaSamples: createObservableWithDisposable onNext msg=Event 2 on thread RxCachedThreadScheduler-1
2019-02-25 08:10:53.414 12071-12101/ru.sample D/RxJavaSamples: createObservableWithDisposable onNext msg=Event 3 on thread RxCachedThreadScheduler-1
2019-02-25 08:10:53.414 12071-12101/ru.sample D/RxJavaSamples: createObservableWithDisposable onNext msg=Event 4 on thread RxCachedThreadScheduler-1
2019-02-25 08:10:53.414 12071-12101/ru.sample D/RxJavaSamples: createObservableWithDisposable onNext msg=Event 5 on thread RxCachedThreadScheduler-1

Я ожидал увидеть только первые два выброса.Т.е. я думал, что после звонка dispose() onNext() звонить не будут.

1 Ответ

0 голосов
/ 25 февраля 2019

вы удалили работника, который предоставлял предметы, а не подписчика.

Чтобы прекратить получать предметы, попробуйте

val compositeDisposable = CompositeDisposable()
    compositeDisposable.add(
            Observable
                    .create { e: ObservableEmitter<String> ->
                        val worker = Schedulers.io().createWorker()
                        e.setDisposable(worker)
                        worker.schedule {
                            for (i in 1..5) {
                                if (i == 3) {
                                    compositeDisposable.dispose() //changed here
                                }
                                e.onNext("Event $i on thread ${Thread.currentThread().name}")
                            }
                        }

                    }
                    .subscribe(
                            { s ->
                                Log.d(TAG, "createObservableWithDisposable onNext msg=$s")
                            },
                            { e ->
                                Log.d(TAG, "createObservableWithDisposable", e)
                            },
                            {
                                Log.d(TAG, "createObservableWithDisposable onComplete")
                            }

                    )
    )
...