Итак, я новичок в программировании Reactive и написал код, который хотел бы протестировать. Это скорее интеграционные тесты, так как я в режиме реального времени копирую файлы, а потом проверяю, совпадают ли они. Я MockWebServer
высмеиваю мой ответ, чтобы быть 4xx
, который хорошо обрабатывается в коде. К сожалению, я также получаю io.netty.handler.timeout.ReadTimeoutException
, что покрывает мой пользовательский WebClientResponseException
, поэтому в тесте я получаю неправильное исключение. По сути, у меня есть два вопроса, с какой стати я получаю это io.netty.handler.timeout.ReadTimeoutException
исключение? По какой-то причине он появляется только после doOnError()
метода, и я не уверен, почему это вообще происходит.
Сейчас код работает и синхронен, я хорошо это знаю.
Второй вопрос: как я могу обработать свое пользовательское исключение в тестах после заданного количества попыток? Прямо сейчас это 3, и только тогда я хотел бы, чтобы было выброшено мое другое исключение.
Вот код:
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(targetPath, StandardOpenOption.WRITE);
Flux<DataBuffer> fileDataStream = Mono.just(filePath)
.map(file -> targetPath.toFile().exists() ? targetPath.toFile().length() : 0)
.map(bytes -> webClient
.get()
.uri(uri)
.accept(MediaType.APPLICATION_OCTET_STREAM)
.header("Range", String.format("bytes=%d-", bytes))
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new CustomException("4xx error")))
.onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new CustomException("5xx error")))
.bodyToFlux(DataBuffer.class)
.doOnError(throwable -> log.info("fileDataStream onError", throwable))
)
.flatMapMany(Function.identity());
return DataBufferUtils
.write(fileDataStream, fileChannel)
.map(DataBufferUtils::release)
.doOnError(throwable -> {
try {
fileChannel.force(true);
} catch (IOException e) {
throw new WritingException("failed force update to file channel", e);
}
})
.retry(3)
.doOnComplete(() -> {
try {
fileChannel.force(true);
} catch (IOException e) {
log.warn("failed force update to file channel", e);
throw new WritingException("failed force update to file channel", e);
}
})
.doOnError(throwable -> targetPath.toFile().delete())
.then(Mono.just(target));
Ответ Mono<Path>
, так как меня интересует толькоPath
недавно созданного и скопированного файла.
Любые комментарии относительно кода приветствуются.
Механизм копирования был создан на основе этой темы Загрузите файл и сохраните файл из ClientRequest, используяExchangeFunction в реакторе проекта