Во-первых, некоторый фон (возможно, есть лучший способ сделать это):
У нас есть модуль, который излучает входящие сообщения Bluetooth на определенную наблюдаемую. Затем мы обрабатываем эти сообщения и, наконец, в конце подписываемся, чтобы отправлять сообщения вперед. Эта обработка может измениться в какое-то время, что для большей части обработки означает воссоздание промежуточных наблюдаемых и всех наблюдаемых, которые зависят от нее (поскольку они будут обрабатывать недействительные данные сейчас).
Мы хотели бы изменить его так, чтобы воссоздание некоторой части обработки не требовало воссоздания всего, что от него зависит, в основном, чтобы нам не приходилось постоянно помнить, что зависело от чего, а также чтобы операторы с внутренними состояние (например, буфер, сканирование или отмена) не теряет это внутреннее состояние.
Перспективное решение:
Используя оператор switchOnNext, мы решили бы эту проблему. Всякий раз, когда промежуточная наблюдаемая воссоздается, мы просто добавляем это к источнику switchOnNext, и тот, кто подписался на выходные данные switchOnNext, немедленно получит новые результаты.
Проблема:
Если обработка после switchOnNext должна измениться, она прекратит получать результаты до тех пор, пока не произойдут предыдущие наблюдаемые изменения. Это означает, что у нас сейчас есть проблема с противоположной стороной. всякий раз, когда какая-либо часть изменяется, мы должны рекурсивно воссоздать все, от чего это зависит. Это немного лучше (гораздо легче отслеживать, что зависит от чего-то, чем все, что от него зависит), но наблюдаемые все равно теряют внутреннее состояние, поскольку их приходится создавать заново.
Такое поведение, по-видимому, противоречит тому, что в документации сказано, что должно произойти, но оно явно не говорит так или иначе.
Пример кода:
Этот код демонстрирует проблему.
import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject
fun main() {
//Observable of observables
val publishSubject: PublishSubject<Observable<Int>> = PublishSubject.create()
//Observable to subscribe to get the most recent values
val observable: Observable<Int> = Observable.switchOnNext(publishSubject)
observable.subscribe { println("1: $it") }
//Now 1 is subscribed
val obsAux1 = PublishSubject.create<Int>()
observable.subscribe { println("2: $it") }
//Now 1 and 2 are subscribed
publishSubject.onNext(obsAux1)
observable.subscribe { println("3: $it") }
//Now 1, 2 and 3 are subscribed
//Should print out from subscriptions 1, 2 and 3, but only 1 and 2 printed
obsAux1.onNext(1)
val obsAux2 = PublishSubject.create<Int>()
publishSubject.onNext(obsAux2)
observable.subscribe { println("4: $it") }
//Now 1, 2, 3 and 4 are subscribed
//Should not print anything
obsAux1.onNext(2)
//Should print out from subscriptions 1, 2, 3 and 4, but only 1, 2 and 3 printed
obsAux2.onNext(3)
}
Вывод этого кода:
1: 1
2: 1
1: 3
2: 3
3: 3
Ожидаемый результат:
1: 1
2: 1
3: 1 <--- This is missing
1: 3
2: 3
3: 3
4: 3 <--- This is missing
При первом запуске obsAux1 должны быть напечатаны все три подписки, но только те, которые были добавлены для публикации, опубликованы.
Во второй раз, когда генерируется obsAux1, ничего не должно печататься, поскольку obsAux2 уже вставлен. Это работает как ожидалось
При первом запуске obsAux2 должны быть напечатаны все четыре подписки. Третья подписка печатается как положено, что следует, чтобы подписка работала нормально Но четвертая подписка ничего не печатает, так как она была добавлена после вставки obsAux2 в publishSubject.