Я не понимаю, почему switchMap не работает должным образом.
В первом случае switchMap не прерывает Observable, но во втором случае он работает как положено. Пожалуйста, объясните это поведение.
val test = TestObserver<Int>()
Observable.fromArray(10, 20)
.switchMap({
// 1. switchMap does not interrupt generator
Observable.generate(
Callable{
generateSequence(it, { n -> if (n < it + 9) n + 1 else null}).iterator()
},
BiConsumer<Iterator<Int>, Emitter<Int>>(){ data, emitter ->
if (data.hasNext()) {
Thread.sleep(100)
emitter.onNext(data.next())
}else {
emitter.onComplete()
}
}
)
// 2. switchMap disposes observable
//Observable.just(it).delay(1, TimeUnit.SECONDS)
},2)
.subscribeWith(test)
test.await()
// 1. >> [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
// But I expect [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
// 2. >> [20]
print(test.values())
Что я хочу сделать.
На самом деле мой генератор выполняет тяжелую работу, основываясь на значении switchMap, и я хочу прервать эту работу, если появилось новое значение. В начале генератор запускает транзакцию, а в конце генератор фиксирует или отменяет транзакцию.