Как подписаться на заметку, которой нужна очередная горячая подписка - RxJava - PullRequest
0 голосов
/ 25 декабря 2018

Мне нужно подписаться на наблюдаемую, которая нуждается в другой наблюдаемой, чтобы подписаться.Что-то вроде:

val someSingle = Single.create<Response>{singleEmmiter ->
        dataStream.subscribe {elementInDataStream->
            if(elementInDataStream == "somethingiwant"){
                .
                .
                .
                val someresponse = //someCode
                singleEmmiter.onSuccess(someResponse)
            }
        }
    }

Мне нужно что-то подобное, потому что в dataStream у меня может быть что угодно, и, возможно, я могу использовать более одного элемента объекта dataStream (так что я думаю, что я не могу выполнять операции, такие как filter ondataStream).Где dataStream является экземпляром ConnectableObservable (Hot observable)

val dataStream = listOf("hello","world", "somethingiwant").toObservable().publish()

Тогда у меня есть что-то вроде:

someSingle.blockingGet()
dataStream.connect()

Этот код никогда не закончится, потому что someSingleгенерирует блокирующее ожидание и ожидает некоторый элемент в dataStream, но dataStream еще не отправляет какой-либо элемент.Генерация бесконечного блока.

Если я изменяю код следующим образом:

dataStream.connect()   
someSingle.blockingGet()

Это создает проблему параллелизма, потому что, возможно, элемент "thingiwant " уже был опубликовангорячая наблюдаемая, поэтому мой блок никогда не закончится.

Возможно, какое-то решение состоит в том, чтобы использовать ReplaySubject вместо ConnectableObservable , но я думаю, что это не оптимальное решение.

Примечание: мне нужно использовать блокировку get, потому что ответ, который я хочу получить от блокировки, - это ответ контроллера.

...