WebClient Как очистить часть полученных данных? - PullRequest
0 голосов
/ 03 сентября 2018

Я хочу реализовать частичную очистку полученного контента. Например, у меня есть обработчик:

return client
    .post()
    .body(BodyInserters.fromDataBuffers(
        request.body(BodyExtractors.toDataBuffers())))
    .exchange()
    .....

Как вызвать сброс при получении некоторого количества буферов данных?

1 Ответ

0 голосов
/ 03 сентября 2018

Во-первых, предупреждение об этом:

  • по умолчанию (т. Е. Если вы не очищаете вручную), Netty будет буферизовать байты и время от времени очищать их, когда канал готов и стратегия очистки считает, что это удобно. Это оптимизировано для производительности.

  • если вы хотите сбросить данные вручную, это не гарантирует, что другая сторона получит эти группы байтов таким же образом; посредники могут буферизовать вещи по пути. Вот где это может не достичь того, что вы пытаетесь сделать: ручная очистка обычно не связана с оптимизацией производительности, а с семантикой протокола.

  • использование стратегии ручной очистки полезно только в том случае, если вы связываете ее с семантикой протокола (например, разделителем сообщений), чтобы другая сторона знала, как разделять сообщения (это то, что Spring WebFlux делает для SSE и application/streaming+json.

Теперь, чтобы реализовать это, Reactor предоставляет несколько операторов windowXYZ с разными стратегиями. Flux.window(int) основан на количестве элементов, windowTimeout(Duration) основан на длительности и т. Д. В этом случае вы, вероятно, захотите использовать windowUntil(Predicate).

Давайте попробуем реализовать что-то, что сбрасывается при буферизации определенного объема данных.

Flux<DataBuffer> buffers = //...;

int maxSize = //...;
AtomicInteger currentSize = new AtomicInteger(0);
Flux<Flux<DataBuffer>> bufferWindow = buffers.windowUntil(buf -> {
    if (currentSize.addAndGet(buf.readableByteCount()) < maxSize) {
        return false;
    }
    currentSize.set(0);
    return true;
});

WebClient.create()
        .post()
        .body((outputMessage, context) -> outputMessage.writeAndFlushWith(bufferWindow))
        .retrieve();

Обратите внимание, что реализация имеет недостатки , если вы работаете с бесконечным потоком данных: он не будет сбрасываться до тех пор, пока не будет достигнута квота или пока источник не будет заполнен. Так что это может удерживать данные дольше, чем необходимо.

...