Как избежать этого предупреждения компилятора о закрытии OutputStream при использовании CompletableFuture? - PullRequest
0 голосов
/ 26 апреля 2018

Я использую CompletableFuture для асинхронной операции, чтобы загрузить файл и сохранить его содержимое с помощью OutputStream. Приведенный ниже код работает, но компилятор дает мне предупреждение либо использовать try-with-resources, либо закрыть OutputStream в finally-clause, хотя он закрывается после завершения Future в whenComplete.

Код:

final OutputStream outputStream = Files.newOutputStream(file.toPath());
final String url = "https://example.com/some-download.zip";
final CompletionStage<WSResponse> futureResponse = this.client
        .url(url)
        .setMethod("GET")
        .stream();

futureResponse.thenCompose(res -> {
    downloadTask.setTotalBytes(res);
    Source<ByteString, ?> responseBody = res.getBodyAsSource();

    Sink<ByteString, CompletionStage<akka.Done>> outputWriter =
            Sink.foreach(bytes -> {
                downloadTask.addReceivedBytes(bytes.size());
                System.out.println(downloadTask.getProgressAsString());
                outputStream.write(bytes.toArray());
            });

    return responseBody.runWith(outputWriter, this.materializer);
}).whenComplete((res, error) -> {
    try {
        outputStream.close();
    } catch (final IOException e) {
        e.printStackTrace();
    }
});

Внимание:

enter image description here

Проблема:

Когда я использую try-with-resources, программа закрывает OutPutStream, прежде чем что-либо записывается в файл, из-за асинхронной природы CompletableFuture, которая не блокируется.

Так есть ли способ объявить OutputStream внутри CompletionStage и передать его по строке?

1 Ответ

0 голосов
/ 28 апреля 2018

Предложение @AndyTurner в его комментарии было правильным. Мне просто нужно немного подправить код, чтобы он заработал.

Я не осознавал, что responseBody.runWith() возвращает еще один CompletionStage, поэтому при использовании try-with-resources или finally закрывается OutputStream в блоке thenComponse, поэтому возвращается CompletionStage (из * 1010). *) больше не мог писать. Это вызвало ошибку. Поэтому нам просто нужно обработать responseBody.runWith() «синхронно» в одном и том же блоке, используя .toCompletableFuture().get(). Это не проблема, потому что сам блок выполняется в другом потоке, то есть он остается асинхронным. Поскольку мы больше ничего не возвращаем, нам также нужно использовать thenAccept, который принимает Consumer вместо thenCompose, который принимает Function.

final String url = "https://example.com/some-download.zip";
final CompletionStage<WSResponse> futureResponse = this.client
        .url(url)
        .setMethod("GET")
        .stream();

futureResponse
        .thenAccept(res -> {
            try (OutputStream outputStream = Files.newOutputStream(file.toPath())) {
                downloadTask.setTotalBytes(res);
                Source<ByteString, ?> responseBody = res.getBodyAsSource();

                Sink<ByteString, CompletionStage<akka.Done>> outputWriter =
                        Sink.foreach(bytes -> {
                            downloadTask.addReceivedBytes(bytes.size());
                            System.out.println(downloadTask.getProgressAsString());
                            outputStream.write(bytes.toArray());
                        });
                responseBody.runWith(outputWriter, this.materializer).toCompletableFuture().get();
            } catch (IOException | InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
...