Здесь происходит то, что ваш клиент запрашивает полностью полученный (агрегированный) HttpResponse, оборачивая байтовый массив, который затем преобразуется в InputStream.Чтобы получить байты ответа без агрегирования, вам необходимо запросить один из реактивных типов, например org.reactivestreams.Publisher
(или его подходящий подкласс), равный ByteBuffer
с.Затем вам нужно обработать их.
Пример:
Flowable<ByteBuffer<?>> getQueryResult(String jobId, String resultId);
Затем вы можете запустить map
, forEach
, blockingForEach
и т. Д. Для этого io.reactivex.Flowable
- НО ПОМНИТЕЧтобы освободить буферы, или вы создадите много мусора, и получите неприятные сообщения журнала.Пример (в Groovy):
Flowable<ByteBuffer<?>> responseFlowable = myClient.getQueryResult("job1", "foo")
int sum = 0
responseFlowable.blockingForEach { ByteBuffer byteBuffer ->
sum += byteBuffer.toByteArray().count('!')
((ReferenceCounted)byteBuffer).release() // Let Netty do its thing!
}
(Очевидно, блокирование плохо для высокой пропускной способности, но это только пример)
Надеюсь, это поможет.