Я создаю класс WebSocket для получения некоторых данных и пытаюсь использовать Flowable с FlowableEmitter для получения ответа сокета.
Я хочу удерживать эмиттер, и каждый раз, когда я получаю данные, которые я вызываю onNext и обрабатываю результат, он работает нормально, если я подписываюсь в каждом потоке, но не работает, если я использую FlatMap для отправки другого сообщения после первые текучие получают данные.
Я создаю Список эмиттеров:
var listEmitter: MutableList<FlowableEmitter<*>> = mutableListOf()
Затем я создаю функцию для регистрации наблюдателя:
inline fun <reified T> onMessage(data: ByteString?): Flowable<T> {
return Flowable.create<T>({ emitter ->
listEmitter.add(emitter)
//send the data to socket
data?.let { socket?.send(data) }
}, BackpressureStrategy.LATEST)
}
Когда я получаю данные из сокета, я звоню:
//I called [0] here to demonstrate the logic only
((listEmitter[0].subscriber) as FlowableEmitter<Any>).onNext(true)
И в моем докладчике я пытаюсь сделать что-то вроде:
socket
.onMessage(/*some ByteString*/)
.flatMap {
if (it) {
socket.onMessage(/*other data*/)
} else {
Flowable.just(false)
}
}.flatMap {
if (it) {
socket.onMessage(/*other data*/)
} else {
Flowable.just(false)
}
}.subscribe { /* do some stuff */}
Итак, при этом только первый FlatMap является триггером, второй и третий никогда не запускаются, я вызываю onNext на правильном эмиттере, но ничего не произошло, вызывается onNext, FlatMap не вызывается.
Данные, которые я отправляю по "onMessage", имеют идентификатор, поэтому в WebSocketConnection я знаю правильный эмиттер, но для упрощения я здесь этого не делал
Я не уверен, почему не работает, и не знаю, как я могу заархивировать это. Я хочу отправить сообщение только после того, как первый получит результат.