Во-первых, предупреждение об этом:
по умолчанию (т. Е. Если вы не очищаете вручную), Netty будет буферизовать байты и время от времени очищать их, когда канал готов и стратегия очистки считает, что это удобно. Это оптимизировано для производительности.
если вы хотите сбросить данные вручную, это не гарантирует, что другая сторона получит эти группы байтов таким же образом; посредники могут буферизовать вещи по пути. Вот где это может не достичь того, что вы пытаетесь сделать: ручная очистка обычно не связана с оптимизацией производительности, а с семантикой протокола.
использование стратегии ручной очистки полезно только в том случае, если вы связываете ее с семантикой протокола (например, разделителем сообщений), чтобы другая сторона знала, как разделять сообщения (это то, что Spring WebFlux делает для SSE и application/streaming+json
.
Теперь, чтобы реализовать это, Reactor предоставляет несколько операторов windowXYZ
с разными стратегиями. Flux.window(int)
основан на количестве элементов, windowTimeout(Duration)
основан на длительности и т. Д. В этом случае вы, вероятно, захотите использовать windowUntil(Predicate)
.
Давайте попробуем реализовать что-то, что сбрасывается при буферизации определенного объема данных.
Flux<DataBuffer> buffers = //...;
int maxSize = //...;
AtomicInteger currentSize = new AtomicInteger(0);
Flux<Flux<DataBuffer>> bufferWindow = buffers.windowUntil(buf -> {
if (currentSize.addAndGet(buf.readableByteCount()) < maxSize) {
return false;
}
currentSize.set(0);
return true;
});
WebClient.create()
.post()
.body((outputMessage, context) -> outputMessage.writeAndFlushWith(bufferWindow))
.retrieve();
Обратите внимание, что реализация имеет недостатки , если вы работаете с бесконечным потоком данных: он не будет сбрасываться до тех пор, пока не будет достигнута квота или пока источник не будет заполнен. Так что это может удерживать данные дольше, чем необходимо.