Загрузите и сохраните файл из ClientRequest, используя ExchangeFunction в Project Reactor - PullRequest
0 голосов
/ 29 апреля 2018

У меня проблема с правильным сохранением файла после его загрузки в Project Reactor.

class HttpImageClientDownloader implements ImageClientDownloader {

    private final ExchangeFunction exchangeFunction;

    HttpImageClientDownloader() {
        this.exchangeFunction = ExchangeFunctions.create(new ReactorClientHttpConnector());
    }

    @Override
    public Mono<File> downloadImage(String url, Path destination) {

        ClientRequest clientRequest = ClientRequest.create(HttpMethod.GET, URI.create(url)).build();
        return exchangeFunction.exchange(clientRequest)
                .map(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
                //.flatMapMany(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
                .flatMap(dataBuffer -> {

                    AsynchronousFileChannel fileChannel = createFile(destination);
                    return DataBufferUtils
                            .write(dataBuffer, fileChannel, 0)
                            .publishOn(Schedulers.elastic())
                            .doOnNext(DataBufferUtils::release)
                            .then(Mono.just(destination.toFile()));


                });

    }

    private AsynchronousFileChannel createFile(Path path) {
        try {
            return AsynchronousFileChannel.open(path, StandardOpenOption.CREATE);
        } catch (Exception e) {
            throw new ImageDownloadException("Error while creating file: " + path, e);
        }
    }
}

Итак, мой вопрос: Блокирует ли DataBufferUtils.write (dataBuffer, fileChannel, 0)?

Как насчет того, когда диск работает медленно?

И второй вопрос о том, что происходит, когда возникает исключение ImageDownloadException, В doOnNext я хочу освободить данный буфер данных, это хорошее место для такой операции?

Я думаю, также эта строка:

            .map(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))

может блокировать ...

1 Ответ

0 голосов
/ 30 апреля 2018

Вот еще один (более короткий) способ добиться этого:

Flux<DataBuffer> data = this.webClient.get()
        .uri("/greeting")
        .retrieve()
        .bodyToFlux(DataBuffer.class);

Path file = Files.createTempFile("spring", null);
WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
Mono<File> result = DataBufferUtils.write(data, channel)
        .map(DataBufferUtils::release)
        .then(Mono.just(file));

Теперь DataBufferUtils::write операции не блокируются, потому что они используют неблокирующий ввод-вывод с каналами. Запись в такие каналы означает, что он запишет все, что может, в выходной буфер (то есть может записать все DataBuffer или только его часть).

Использование Flux::map или Flux::doOnNext - правильное место для этого. Но вы правы, если возникает ошибка, вы все равно отвечаете за освобождение текущего буфера (и всех оставшихся). В Spring Framework может быть что-то, что мы можем улучшить, пожалуйста, следите за SPR-16782 .

Я не вижу, как ваш последний пример показывает что-либо блокирующее: все методы возвращают реактивные типы, и ни один не выполняет блокировку ввода-вывода.

...