Оператор switchOnNext не генерирует подписки после последней вставленной наблюдаемой - PullRequest
0 голосов
/ 27 марта 2019

Во-первых, некоторый фон (возможно, есть лучший способ сделать это):

У нас есть модуль, который излучает входящие сообщения Bluetooth на определенную наблюдаемую. Затем мы обрабатываем эти сообщения и, наконец, в конце подписываемся, чтобы отправлять сообщения вперед. Эта обработка может измениться в какое-то время, что для большей части обработки означает воссоздание промежуточных наблюдаемых и всех наблюдаемых, которые зависят от нее (поскольку они будут обрабатывать недействительные данные сейчас).

Мы хотели бы изменить его так, чтобы воссоздание некоторой части обработки не требовало воссоздания всего, что от него зависит, в основном, чтобы нам не приходилось постоянно помнить, что зависело от чего, а также чтобы операторы с внутренними состояние (например, буфер, сканирование или отмена) не теряет это внутреннее состояние.

Перспективное решение:

Используя оператор switchOnNext, мы решили бы эту проблему. Всякий раз, когда промежуточная наблюдаемая воссоздается, мы просто добавляем это к источнику switchOnNext, и тот, кто подписался на выходные данные switchOnNext, немедленно получит новые результаты.

Проблема:

Если обработка после switchOnNext должна измениться, она прекратит получать результаты до тех пор, пока не произойдут предыдущие наблюдаемые изменения. Это означает, что у нас сейчас есть проблема с противоположной стороной. всякий раз, когда какая-либо часть изменяется, мы должны рекурсивно воссоздать все, от чего это зависит. Это немного лучше (гораздо легче отслеживать, что зависит от чего-то, чем все, что от него зависит), но наблюдаемые все равно теряют внутреннее состояние, поскольку их приходится создавать заново.

Такое поведение, по-видимому, противоречит тому, что в документации сказано, что должно произойти, но оно явно не говорит так или иначе.

Пример кода:

Этот код демонстрирует проблему.

import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject

fun main() {
    //Observable of observables
    val publishSubject: PublishSubject<Observable<Int>> = PublishSubject.create()
    //Observable to subscribe to get the most recent values
    val observable: Observable<Int> = Observable.switchOnNext(publishSubject)

    observable.subscribe { println("1: $it") }
    //Now 1 is subscribed

    val obsAux1 = PublishSubject.create<Int>()

    observable.subscribe { println("2: $it") }
    //Now 1 and 2 are subscribed

    publishSubject.onNext(obsAux1)

    observable.subscribe { println("3: $it") }
    //Now 1, 2 and 3 are subscribed

    //Should print out from subscriptions 1, 2 and 3, but only 1 and 2 printed
    obsAux1.onNext(1)

    val obsAux2 = PublishSubject.create<Int>()

    publishSubject.onNext(obsAux2)

    observable.subscribe { println("4: $it") }
    //Now 1, 2, 3 and 4 are subscribed

    //Should not print anything
    obsAux1.onNext(2)
    //Should print out from subscriptions 1, 2, 3 and 4, but only 1, 2 and 3 printed
    obsAux2.onNext(3)
} 

Вывод этого кода:

1: 1
2: 1
1: 3
2: 3
3: 3

Ожидаемый результат:

1: 1
2: 1
3: 1 <--- This is missing
1: 3
2: 3
3: 3
4: 3 <--- This is missing

При первом запуске obsAux1 должны быть напечатаны все три подписки, но только те, которые были добавлены для публикации, опубликованы.

Во второй раз, когда генерируется obsAux1, ничего не должно печататься, поскольку obsAux2 уже вставлен. Это работает как ожидалось

При первом запуске obsAux2 должны быть напечатаны все четыре подписки. Третья подписка печатается как положено, что следует, чтобы подписка работала нормально Но четвертая подписка ничего не печатает, так как она была добавлена ​​после вставки obsAux2 в publishSubject.

Ответы [ 2 ]

0 голосов
/ 31 марта 2019

Решение состоит в том, чтобы просто использовать BehaviourSubject вместо PublishSubject, по крайней мере, для наблюдаемых наблюдаемых.

Различие между ними заключается в том, что в новой подписке PublishSubject будет излучать только дополнительные элементы,в то время как объект BehaviourSubject немедленно испускает последний элемент и продолжает нормально.

Я все еще не согласен с тем, что так оно и должно работать, но в любом случае это решает нашу проблему.

Код, на случай, если кому-то понадобитсяэто (просто изменение в первой строке основного и дополнительного импорта):

import io.reactivex.subjects.BehaviorSubject
import io.reactivex.subjects.PublishSubject

fun main() {
    //Observable of observables
    val publishSubject: BehaviorSubject<Observable<Int>> = BehaviorSubject.create()
    //Observable to subscribe to get the most recent values
    val observable: Observable<Int> = Observable.switchOnNext(publishSubject)

    observable.subscribe { println("1: $it") }
    //Now 1 is subscribed

    val obsAux1 = PublishSubject.create<Int>()

    observable.subscribe { println("2: $it") }
    //Now 1 and 2 are subscribed

    publishSubject.onNext(obsAux1)

    observable.subscribe { println("3: $it") }
    //Now 1, 2 and 3 are subscribed

    //Should print out from subscriptions 1, 2 and 3, but only 1 and 2 printed
    obsAux1.onNext(1)

    val obsAux2 = PublishSubject.create<Int>()

    publishSubject.onNext(obsAux2)

    observable.subscribe { println("4: $it") }
    //Now 1, 2, 3 and 4 are subscribed

    //Should not print anything
    obsAux1.onNext(2)
    //Should print out from subscriptions 1, 2, 3 and 4, but only 1, 2 and 3 printed
    obsAux2.onNext(3)
}
0 голосов
/ 27 марта 2019

Вы должны использовать switchOnNext следующим образом. В документе они упоминают

{@ code switchOnNext} подписывается на ObservableSource, который испускает ObservableSources. Каждый раз, когда он наблюдает один из этих излучаемых ObservableSources, ObservableSource, возвращаемый {@code switchOnNext} начинает испускать элементы, испускаемые этим ObservableSource. Когда создается новый ObservableSource, {@code switchOnNext} прекращает испускать элементы из ранее выпущенных ObservableSource и начинает испускать предметы из нового.

следующий код даст ожидаемый результат. Исходя из логики, вы измените его следующим образом.

fun main()
{
val publishSubject: PublishSubject<Observable<Int>> = PublishSubject.create()
val observable: Observable<Int> = Observable.switchOnNext(publishSubject)
observable.subscribe { println("1: $it") }

val obsAux1 = PublishSubject.create<Int>()
observable.subscribe { println("2: $it") }
observable.subscribe { println("3: $it") }
publishSubject.onNext(obsAux1)
obsAux1.onNext(1)

val obsAux2 = PublishSubject.create<Int>()
publishSubject.onNext(obsAux2)
observable.subscribe { println("4: $it") }
obsAux1.onNext(2)

publishSubject.onNext(obsAux2)
obsAux2.onNext(3) 
}
...