doOnDispose не вызывается при подписке на фоновый планировщик - PullRequest
0 голосов
/ 24 октября 2018

Я пытаюсь соединить существующий rx-код и API, который использует фьючерсы.Когда я вручную располагаю наблюдаемое, я ожидаю, что onDispose () будет вызываться всегда.Обычно это происходит, но когда я указываю собственный планировщик, он иногда не вызывается.Мой пример:

class Work {

private val disposables = CompositeDisposable()

fun getFuture(): ListenableFuture<String> {

    val future = ResolvableFuture.create<String>()

    disposables.add(

            Observable.fromCallable {
                try {
                    Thread.sleep(2000)
                } catch (ex: InterruptedException) {

                }
                "1"
            }.firstOrError()
                .onErrorReturn { "2" }
                .doOnDispose {
                    println("disposing 1 on ${Thread.currentThread().name}")
                    //sometimes this dispose does not get called
                    future.set("2")
                }
                .subscribeOn(Schedulers.io())
                .doOnDispose {
                    println("disposing 2 on ${Thread.currentThread().name}")
                    //only this dispose gets called every time
                    //future.set("2")
                }
                .subscribe(Consumer {
                    future.set("2")
                })
    )

    return future
}

fun stop() {
    disposables.clear()
}

}

@Test
fun `doOnDispose does not get called`() {
    println("------------")

    for (i in 1..100) {

        val work = Work()

        val future = work.getFuture()

        println("Cancelling")

        work.stop()

        println("Getting ${Thread.currentThread().name}")
        val result = future.get(2, TimeUnit.SECONDS)

        assertEquals("2", result)

        println("------------")
    }
}

Что происходит, только когда каждый раз вызывается onDispose?Тот, что перед .subscribeOn () иногда вообще не вызывается.

1 Ответ

0 голосов
/ 24 октября 2018

Вы смешиваете режимы здесь.Не делай этого.Либо используйте RxJava, либо используйте Futures.Вы создали начало технического кошмара.

В планировщике io() создается нить, которая сразу спит в течение 2 секунд.Цепочка наблюдателя удаляется («выбрасывая 2»), высвобождая ресурсы, и dispose() возвращается обратно по цепочке.Однако dispose() ничего не будет делать в потоке io(), потому что он заблокирован sleep().Теперь существует условие состязания, независимо от того, завершается ли поток или сначала выполняется операция удаления.

Я не могу посоветовать вам, как решить проблему, поскольку я не знаю, что вы пытаетесь сделать.Я просто знаю, что то, что у тебя есть, ненадежно.

...