Мне нужно подписаться на наблюдаемую, которая нуждается в другой наблюдаемой, чтобы подписаться.Что-то вроде:
val someSingle = Single.create<Response>{singleEmmiter ->
dataStream.subscribe {elementInDataStream->
if(elementInDataStream == "somethingiwant"){
.
.
.
val someresponse = //someCode
singleEmmiter.onSuccess(someResponse)
}
}
}
Мне нужно что-то подобное, потому что в dataStream у меня может быть что угодно, и, возможно, я могу использовать более одного элемента объекта dataStream (так что я думаю, что я не могу выполнять операции, такие как filter ondataStream).Где dataStream является экземпляром ConnectableObservable (Hot observable)
val dataStream = listOf("hello","world", "somethingiwant").toObservable().publish()
Тогда у меня есть что-то вроде:
someSingle.blockingGet()
dataStream.connect()
Этот код никогда не закончится, потому что someSingleгенерирует блокирующее ожидание и ожидает некоторый элемент в dataStream, но dataStream еще не отправляет какой-либо элемент.Генерация бесконечного блока.
Если я изменяю код следующим образом:
dataStream.connect()
someSingle.blockingGet()
Это создает проблему параллелизма, потому что, возможно, элемент "thingiwant " уже был опубликовангорячая наблюдаемая, поэтому мой блок никогда не закончится.
Возможно, какое-то решение состоит в том, чтобы использовать ReplaySubject вместо ConnectableObservable , но я думаю, что это не оптимальное решение.
Примечание: мне нужно использовать блокировку get, потому что ответ, который я хочу получить от блокировки, - это ответ контроллера.