Несколько Flowable с FlowableEmitter - PullRequest
0 голосов
/ 14 сентября 2018

Я создаю класс WebSocket для получения некоторых данных и пытаюсь использовать Flowable с FlowableEmitter для получения ответа сокета.

Я хочу удерживать эмиттер, и каждый раз, когда я получаю данные, которые я вызываю onNext и обрабатываю результат, он работает нормально, если я подписываюсь в каждом потоке, но не работает, если я использую FlatMap для отправки другого сообщения после первые текучие получают данные.

Я создаю Список эмиттеров:

var listEmitter: MutableList<FlowableEmitter<*>> = mutableListOf()

Затем я создаю функцию для регистрации наблюдателя:

inline fun <reified T> onMessage(data: ByteString?): Flowable<T> {
    return Flowable.create<T>({ emitter ->
        listEmitter.add(emitter)
        //send the data to socket
        data?.let { socket?.send(data) }
    }, BackpressureStrategy.LATEST)
}

Когда я получаю данные из сокета, я звоню:

//I called [0] here to demonstrate the logic only
((listEmitter[0].subscriber) as FlowableEmitter<Any>).onNext(true)

И в моем докладчике я пытаюсь сделать что-то вроде:

socket
    .onMessage(/*some ByteString*/)
    .flatMap {
        if (it) {
            socket.onMessage(/*other data*/)
        } else {
            Flowable.just(false)
        }
    }.flatMap {
        if (it) {
            socket.onMessage(/*other data*/)
        } else {
            Flowable.just(false)
        }
    }.subscribe { /* do some stuff */}

Итак, при этом только первый FlatMap является триггером, второй и третий никогда не запускаются, я вызываю onNext на правильном эмиттере, но ничего не произошло, вызывается onNext, FlatMap не вызывается.

Данные, которые я отправляю по "onMessage", имеют идентификатор, поэтому в WebSocketConnection я знаю правильный эмиттер, но для упрощения я здесь этого не делал

Я не уверен, почему не работает, и не знаю, как я могу заархивировать это. Я хочу отправить сообщение только после того, как первый получит результат.

...