Я определил следующий код rxjava (Kotlin) для создания неблокирующего, чувствительного к обратному давлению генератора.Генератор подается неблокирующим потоком JSON из хранилища BLOB-объектов Azure через JsonPiser Jackson JSON Api, который обеспечивает очень специализированный и быстрый анализ.Мой метод downloadBlob () возвращает Single<ByteBuffer>
, к которому я присоединяю генератор.Я не могу понять, почему переменная генератора сообщается как Single<Flowable<Sample>>
, а не Flowable<Sample>
.Буду признателен за ваши предложения по извлечению Flowable<Sample>
, что мне и нужно.Я попал в стену по этому вопросу ...: (
Спасибо за вашу помощь и время.
/**
* Method generator parses sample data from a json stream.
* @param sampleJsonUrl String is the URL to the Azure Blob Storage sample data.
* @return Flowable<Sample>> is a cold, synchronous, stateful and backpressure-aware
* generator of features.
*/
fun generator(sampleJsonUrl: String) =
blobStorage.downloadBlob(sampleJsonUrl)
.map { bbuf: ByteBuffer -> JsonFactory().createParser(bbuf.array()) }
.map { jParser ->
Flowable.generate<Sample, JsonParser>(
java.util.concurrent.Callable { jParser.gobbleJsonToSamples() },
io.reactivex.functions.BiConsumer<JsonParser, Emitter<Sample>> {
parser: JsonParser, emitter: Emitter<Sample> ->
pullOrComplete(parser, emitter)
},
Consumer<JsonParser> { jParser.close() }
)
}.flatMapPublisher { it }
/**
* Method downloadBlob will retrieve a blob from storage and return a
* reactive stream of bytes.
* @param url String is the url to the blob.
* @return Single<ByteBuffer> is the blob's content.
*/
override fun downloadBlob(url: String): Single<ByteBuffer> =
BlockBlobURL(URL(url), pipeline)
.download(null, null, false, null)
.flatMap {
FlowableUtil
.collectBytesInBuffer(
it.body(ReliableDownloadOptions().withMaxRetryRequests(3))
)
}