Возобновите загрузку файлов с помощью Spring Webflux и статических файлов, работающих в Spring - PullRequest
1 голос
/ 25 октября 2019

Я полагаю, что во всем Интернете нет ответа на этот вопрос, поскольку он, вероятно, очень сложен, но я пойду и спрошу.

В принципе, я хочу иметь перекрестную связь между несколькими приложениями Spring. Каждый из них обслуживает ресурсы статическим способом, вот ссылка на эту тему . Эта служба используется другими экземплярами приложения, которые могут загружать эти файлы по запросу (сейчас я передаю файлы через HTTP). Я смог загрузить файлы благодаря Downlolad и сохранить файл из ClientRequest с помощью ExchangeFunction в Project Reactor ТАК вопрос.

Прямо сейчас я хочу повысить свой код, чтобы в случае подключенияпроблема или приложение временно недоступно в течение заданного времени ожидания. Я могу возобновить загрузку файла. Я настроил WebClient тайм-ауты как в этой статье .

Прямо сейчас я думал, что такой код на самом деле позволит мне обрабатывать временно недоступные сервисы:

final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(targetPath, StandardOpenOption.WRITE);

Flux<DataBuffer> fileData = Mono.just(filePath)
    .map(file -> targetPath.toFile().exists() ? targetPath.toFile().length() : 0)
    .map(bytes -> webClient
            .get()
            .uri(uri)
            .accept(MediaType.APPLICATION_OCTET_STREAM)
            .header("Range", String.format("bytes=%d-", bytes))
            .retrieve()
            .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new CustomException("4xx error")))
            .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new CustomException("5xx error")))
            .bodyToFlux(DataBuffer.class)
    )
    .flatMapMany(Function.identity());

DataBufferUtils
        .write(fileData , fileChannel)
        .map(DataBufferUtils::release)
        .doOnError(throwable -> {
            try {
                fileChannel.force(true);
            } catch (IOException e) {
                e.printStackTrace();
            }
        })
        .retry(3)
        .doOnComplete(() -> {
            try {
                fileChannel.force(true);
            } catch (IOException e) {
                e.printStackTrace();
            }
        })
        .doOnError(e -> !(e instanceof ChannelException), e -> {
            try {
                Files.deleteIfExists(targetPath);
            } catch (IOException exc) {
                exc.printStackTrace();
            }
        })
        .doOnError(ChannelException.class, e -> {
            try {
                Files.deleteIfExists(targetPath);
            } catch (IOException exc) {
                exc.printStackTrace();
            }
        })
        .doOnTerminate(() -> {
            try {
                fileChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        })
    .blockLast();

Но, очевидно, я получаю полный стек ошибок всякий раз, когда убиваю свой второй экземпляр приложения, начиная с:

reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
2019-10-25T15:41:53.602+0200 [ERROR] [xxx] [N/A:N/A] [r.core.publisher.Operators] { thread=reactor-http-nio-4  } Operator called default onErrorDropped
reactor.core.Exceptions$BubblingException: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
    at reactor.core.Exceptions.bubble(Exceptions.java:154)
    at reactor.core.publisher.Operators.onErrorDropped(Operators.java:512)
    at reactor.netty.channel.FluxReceive.onInboundError(FluxReceive.java:343)
    at reactor.netty.channel.ChannelOperations.onInboundError(ChannelOperations.java:399)
    at reactor.netty.http.client.HttpClientOperations.onInboundClose(HttpClientOperations.java:258)
    at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:121)

, а также позже в той же трассировке стека:

2019-10-25T15:41:53.602+0200 [WARN] [xxx] [N/A:N/A] [i.n.c.AbstractChannelHandlerContext] { thread=reactor-http-nio-4  } An exception 'reactor.core.Exceptions$BubblingException: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:
reactor.core.Exceptions$BubblingException: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
    at reactor.core.Exceptions.bubble(Exceptions.java:154)
    at reactor.core.publisher.Operators.onErrorDropped(Operators.java:512)
    at reactor.netty.channel.FluxReceive.onInboundError(FluxReceive.java:343)
    at reactor.netty.channel.ChannelOperations.onInboundError(ChannelOperations.java:399)
    at reactor.netty.http.client.HttpClientOperations.onInboundClose(HttpClientOperations.java:258)
    at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:121)

ИсключенияСамо по себе это не составляет большой проблемы, но суть в том, что моя загрузка не возобновляется после того, как я снова загрузил свое приложение для работы.

Так что да, мой вопрос, как я могвозможно возобновить загрузку и как / как я могу обработать такие исключения, как здесь?

Ответы [ 2 ]

0 голосов
/ 04 ноября 2019

У кода было 2 проблемы. Первым из них был способ определения размера файла в начале. Правильный способ использования AtomicLong так же, как в DataBufferUtils.write(). Второй проблемой был retry() между удаленными вызовами. Похоже, было бы неплохо использовать retryWhen() при работе с удаленными вызовами с фиксированной задержкой. Чтобы подвести итог, вот код, который позволил мне возобновить загрузку файла с правильного байта в случае проблемы подключения / приложения

final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(targetPath, StandardOpenOption.WRITE);

AtomicLong fileSize = new AtomicLong(targetPath.toFile().length());

Flux<DataBuffer> fileDataStream = webClient
                .get()
                .uri(remoteXFerServiceTargetHost)
                .accept(MediaType.APPLICATION_OCTET_STREAM)
                .header("Range", String.format("bytes=%d-", fileSize.get()))
                .retrieve()
                .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new CustomException("4xx error")))
                .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new CustomException("5xx error")))
                .bodyToFlux(DataBuffer.class);

DataBufferUtils
        .write(fileData , fileChannel)
        .map(DataBufferUtils::release)
        .doOnError(throwable -> {
            try {
                fileChannel.force(true);
            } catch (IOException e) {
                e.printStackTrace();
            }
        })
        .retryWhen(Retry.any().fixedBackoff(Duration.ofSeconds(5)).retryMax(5))
        .doOnComplete(() -> {
            try {
                fileChannel.force(true);
            } catch (IOException e) {
                e.printStackTrace();
            }
        })
        .doOnError(e -> !(e instanceof ChannelException), e -> {
            try {
                Files.deleteIfExists(targetPath);
            } catch (IOException exc) {
                exc.printStackTrace();
            }
        })
        .doOnError(ChannelException.class, e -> {
            try {
                Files.deleteIfExists(targetPath);
            } catch (IOException exc) {
                exc.printStackTrace();
            }
        })
        .doOnTerminate(() -> {
            try {
                fileChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        })
    .blockLast();
0 голосов
/ 01 ноября 2019

Привет, следующая реализация, которую я нашел Spring-webflux-file-upload-download

и на ее основе ниже приведен мой код для загрузки файла:

import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ZeroCopyHttpOutputMessage;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

import java.io.File;
import java.io.IOException;

@RestController
@RequestMapping("/")
public class DownloadContorller {

    @GetMapping
    public Mono<Void> downloadByWriteWith(ServerHttpResponse response) throws IOException {
        ZeroCopyHttpOutputMessage zeroCopyResponse = (ZeroCopyHttpOutputMessage) response;
        response.getHeaders().set(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=large-file.so");
        response.getHeaders().setContentType(MediaType.APPLICATION_OCTET_STREAM);

        Resource resource = new ClassPathResource("large-file.so");
        File file = resource.getFile();
        return zeroCopyResponse.writeWith(file, 0, file.length());
    }
}

Я протестировал его с помощью wget -c http://localhost:8080/ и смог возобновить загрузку с того места, где он был прерван. Я протестировал его с помощью

  1. Прерывание со стороны клиента (Ctrl + C) на терминале
  2. Остановка приложения и затем его перезапуск.

Iнадеюсь, это поможет.

...