Как преобразовать поток ByteBuffer в Spring BodyInserter - PullRequest
2 голосов
/ 09 мая 2020

У меня есть вариант использования для чтения файла из s3 и publi sh для обслуживания в java.
Для реализации я пытаюсь использовать awssdk s3 API для чтения файла, который возвращает Flux<ByteBuffer>, а затем publi sh для службы отдыха с использованием Spring WebClient.

Согласно моему исследованию, пружина WebClient требует BodyInserter, который можно подготовить с помощью BodyInserters.fromDataBuffers. Я не могу понять, как правильно преобразовать Flux в Flux и вызвать обмен WebClient;

Flux<ByteBuffer> byteBufferFlux = getS3File(key);
        Flux<DataBuffer> dataBufferFlux= byteBufferFlux.map(byteBuffer -> {
            ?????????????Convert bytebuffer to data buffer ??????
            return dataBuffer;
        });

        BodyInserter<Flux<DataBuffer>, ReactiveHttpOutputMessage> inserter = BodyInserters.fromDataBuffers(dataBufferFlux);

Есть предложения, как этого добиться?

Ответы [ 2 ]

1 голос
/ 09 мая 2020

Возможно, вам не нужен dataBufferFlux, и вы должны иметь возможность записывать поток в конечную точку отдыха. Попробуйте это:

  Flux<ByteBuffer> byteBufferFlux = getS3File(key);

  BodyInserter<Flux<ByteBuffer>, ReactiveHttpOutputMessage> = BodyInserters.fromPublisher(byteBufferFlux, ByteBuffer.class);
1 голос
/ 09 мая 2020

Вы можете конвертировать, используя DefaultDataBuffer, который вы можете создать с помощью DefaultDataBufferFactory

DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();

Flux<DataBuffer> buffer = getS3File(key).map(dataBufferFactory::wrap);

BodyInserter<Flux<DataBuffer>, ReactiveHttpOutputMessage> inserter =
    BodyInserters.fromDataBuffers(buffer);

Вам вообще не нужен BodyInserter, хотя при использовании Webclient вы можете использовать следующую сигнатуру метода для body()

<T, P extends Publisher<T>> RequestHeadersSpec<?> body(P publisher, Class<T> elementClass);

В который затем вы можете передать свой Flux<ByteBuffer> напрямую, указав класс для использования

    WebClient.create("http://someUrl")
            .post()
            .uri("/someUri")
            .body(getS3File(key),ByteBuffer.class)
...