Как развернуть SingleТекучий <Sample>? - PullRequest
0 голосов
/ 27 июня 2019

Я определил следующий код rxjava (Kotlin) для создания неблокирующего, чувствительного к обратному давлению генератора.Генератор подается неблокирующим потоком JSON из хранилища BLOB-объектов Azure через JsonPiser Jackson JSON Api, который обеспечивает очень специализированный и быстрый анализ.Мой метод downloadBlob () возвращает Single<ByteBuffer>, к которому я присоединяю генератор.Я не могу понять, почему переменная генератора сообщается как Single<Flowable<Sample>>, а не Flowable<Sample>.Буду признателен за ваши предложения по извлечению Flowable<Sample>, что мне и нужно.Я попал в стену по этому вопросу ...: (

Спасибо за вашу помощь и время.

    /**
     * Method generator parses sample data from a json stream.
     * @param sampleJsonUrl String is the URL to the Azure Blob Storage sample data.
     * @return Flowable<Sample>> is a cold, synchronous, stateful and backpressure-aware
     * generator of features.
     */
fun generator(sampleJsonUrl: String) =
            blobStorage.downloadBlob(sampleJsonUrl)
                    .map { bbuf: ByteBuffer -> JsonFactory().createParser(bbuf.array()) }
                    .map { jParser ->
                        Flowable.generate<Sample, JsonParser>(
                                java.util.concurrent.Callable { jParser.gobbleJsonToSamples() },

                                io.reactivex.functions.BiConsumer<JsonParser, Emitter<Sample>> { 
                                    parser: JsonParser, emitter: Emitter<Sample> ->
                                        pullOrComplete(parser, emitter)
                                },

                                Consumer<JsonParser> { jParser.close() }
                        )
                    }.flatMapPublisher { it }



    /**
     * Method downloadBlob will retrieve a blob from storage and return a 
     * reactive stream of bytes.
     * @param url String is the url to the blob.
     * @return Single<ByteBuffer> is the blob's content.
     */
    override fun downloadBlob(url: String): Single<ByteBuffer> =

            BlockBlobURL(URL(url), pipeline)
                    .download(null, null, false, null)
                    .flatMap {
                        FlowableUtil
                        .collectBytesInBuffer(
                            it.body(ReliableDownloadOptions().withMaxRetryRequests(3))
                        )
                    }
...