Можно ли использовать одного и того же исполнителя для метода подписи и асинхронной задачи?
/ 26 августа 2018

Привет У меня простой вопрос. Предположим, у меня есть класс, подобный приведенному ниже:

import lombok.Value;

import java.nio.file.Path;

class ImageResizeRequest {

    private DownloadedImage downloadedImage;

    private ImageSize imageSize;

    private Path destinationLocation;

Класс выше представляет одну задачу, отвечающую за изменение размера изображения до заданного размера.У меня много запросов на изменение размера этого изображения до разных размеров.

class ImageResizeService {

    private final Executor executor;

    Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {

        return Flux.fromIterable(requests)

    private Mono<ImageResizeResult> resize(ImageResizeRequest request) {

        return Mono.fromFuture(CompletableFuture.supplyAsync(resizeTask(request), executor));


    private Supplier<ImageResizeResult> resizeTask(ImageResizeRequest request) {
        return () -> {
            //TODO add image resize logic for example ImageMagick by Im4Java...
            /** code below call ImageMagick library
             ConvertCmd cmd = new ConvertCmd();
             IMOperation op = new IMOperation();

            //TODO add logic!!!
            return new ImageResizeResult(null, null, null, null);

Мой вопрос: как реализовать в Project Reactor параллельную независимую задачу, отвечающую за изменение размера изображения?Без реактора проекта я бы использовал список CompletableFuture:

private static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
    CompletableFuture<Void> allDoneFuture =
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
    return allDoneFuture.thenApply(v ->
                    map(future -> future.join()).

с указанным сервисом исполнителя.Кроме того, в моем примере я использую одного и того же исполнителя в методе subscribeOn и supplyAsync - это хорошая идея?

Ответы [ 2 ]

/ 27 августа 2018

Итак, весь мой процесс выглядит следующим образом:

class ImageCommandProcessingService {

    private final DownloadRequestFactory downloadRequestFactory;
    private final ImageClientDownloader imageClientDownloader;
    private final ImageResizeRequestFactory imageResizeRequestFactory;
    private final ImageResizeService imageResizeService;

    Mono<List<ImageResizeResult>> process(ResizeImageCommand resizeImageCommand) {
        return Mono.just(resizeImageCommand)
                .map(command -> downloadRequestFactory.create(command.getImageUrl().getUrl()))
                .map(downloadedImage -> imageResizeRequestFactory.createRequests(downloadedImage, resizeImageCommand.getSizes().toJavaList()))



У меня есть команда с URL-адресом изображения и набором размеров:

class ResizeImageCommand {

    private ImageUrl imageUrl;

    private Set<ImageSize> sizes;

Сначала мне нужно загрузить изображениена диске, поэтому я создаю запрос на загрузку по фабрике:

class DownloadRequestFactory {

    private final ImageLocationPathResolver resolver;

    DownloadRequest create(String url) {
        return new DownloadRequest(url, resolver.resolveDownloadedLocation(url));

Resolver - это класс, отвечающий за создание пути к временному файлу и за создание пути для измененного образа:

class ImageLocationPathResolver {

    private String temporaryImagesFolder;
    private String destinationImagesFolder;

    Path resolveDownloadedLocation(String imageUrl) {
        LocalDateTime now = LocalDateTime.now();
        String fileName = now.toString() + "_" + getFileNameExtensionFromUrl(imageUrl);
        return Paths.get(temporaryImagesFolder,getDatePaths(now.toLocalDate()), fileName);

    Path resolveDestinationLocation(ImageSize imageSize, String url) {
        String fileName = getFileNameExtensionFromUrl(url);
        return Paths.get(destinationImagesFolder, imageSize.getName(), getDatePaths(LocalDate.now()), fileName);

    private String getFileNameExtensionFromUrl(String url) {
        return StringUtils.getFilenameExtension(url);

    private String getDatePaths(LocalDate now) {
        return now.getYear() + File.pathSeparator + now.getMonth() + File.pathSeparator + now.getDayOfMonth();

Далее у меня есть клиент, отвечающий за операцию загрузки:

public interface ImageClientDownloader {

    Mono<DownloadedImage> downloadImage(DownloadRequest downloadRequest);

и реализацию:

class HttpImageClientDownloader implements ImageClientDownloader {

    private final WebClient webClient;

    HttpImageClientDownloader() {
        this.webClient = WebClient.create();

    public Mono<DownloadedImage> downloadImage(DownloadRequest downloadRequest) {
        try {
            Flux<DataBuffer> dataBuffer = webClient.get()

            Path resultFilePath = Files.createFile(downloadRequest.getLocation());
            WritableByteChannel channel = Files.newByteChannel(resultFilePath, StandardOpenOption.WRITE);
            return DataBufferUtils.write(dataBuffer, channel)
                    .then(Mono.just(new DownloadedImage(downloadRequest.getUrl(), resultFilePath, LocalDateTime.now())));

        } catch (Exception e) {
            log.error(e.getMessage(), e);
            return Mono.error(e);

Это операция ввода-вывода.Должен ли я использовать выделенный планировщик? В конце у меня есть операция изменения размера, запрос создается внутри операции карты - imageResizeRequestFactory.

0 голосов
/ 27 августа 2018

Не постоянно воссоздайте Scheduler из ExecutorService, но старайтесь обернуть его непосредственно в конструкторе.

Вам вообще не нужны CompletableFuture и subscribeOn следует применять к внутренней части flatMap, чтобы потенциально выбирать отдельные потоки для задачи изменения размера (он выбирает один поток из пула для потока, к которому он применяется):

class ImageResizeService {

  private final Executor executor; //TODO prefer an ExecutorService if possible
  private final Scheduler scheduler; //FIXME Schedulers.fromExecutor(executor)

  Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {
    //we get the requests on IO thread
    return Flux.fromIterable(requests)
            //for each request, perform asynchronous resize...
            .flatMap(r -> Mono
                //... by converting the resizeTask Callable to a Mono
                .fromCallable(r -> resizeTask(r).get())
                //... and making sure it executes on the executor

Для достижения истинногоРаспараллеливание у вас есть другой вариант: parallel().runOn():

Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {
    //we get the requests on IO thread
    return Flux.fromIterable(requests)
            //divide into N workloads
            //the executor _should_ be capable of this degree of parallelisation:
            //actually tell to run each workload on a thread picked from executor
            //here the workload are already running on their dedicated thread,
            //we can afford to block it and thus apply resize in a simpler `map`
            .map(r -> resizeTask(r).get()) //NB: the Supplier aspect can probably be removed
            //go back to a `Flux` sequence for collection into list