Observable.switchMap не прерывает Observable - PullRequest
0 голосов
/ 10 января 2019

Я не понимаю, почему switchMap не работает должным образом. В первом случае switchMap не прерывает Observable, но во втором случае он работает как положено. Пожалуйста, объясните это поведение.

    val test = TestObserver<Int>()

    Observable.fromArray(10, 20)
        .switchMap({
            // 1. switchMap does not interrupt generator
            Observable.generate(
                Callable{
                    generateSequence(it, { n -> if (n < it + 9) n + 1 else null}).iterator()
                },
                BiConsumer<Iterator<Int>, Emitter<Int>>(){ data, emitter ->
                    if (data.hasNext()) {
                        Thread.sleep(100)
                        emitter.onNext(data.next())
                    }else {
                        emitter.onComplete()
                    }
                }
            )
            // 2. switchMap disposes observable
            //Observable.just(it).delay(1, TimeUnit.SECONDS)
        },2)
        .subscribeWith(test)
    test.await()
    // 1. >> [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
    // But I expect [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
    // 2. >> [20]
    print(test.values())

Что я хочу сделать.

На самом деле мой генератор выполняет тяжелую работу, основываясь на значении switchMap, и я хочу прервать эту работу, если появилось новое значение. В начале генератор запускает транзакцию, а в конце генератор фиксирует или отменяет транзакцию.

...