Как можно передать threadpoolexecutor в CompletableFuture? - PullRequest
3 голосов
/ 19 мая 2019

В последнее время я работал над Java CompletableFuture и обнаружил, что мы всегда должны использовать настроенный Threadpool.С его помощью я нашел два способа передачи пула потоков в существующий код.Как показано ниже

Это мой ThreadPool в файле конфигурации

@Override
@Bean(name = "commonThreadPool")
public Executor getAsyncExecutor() {
  return new ThreadPoolTaskExecutor();
}

1.Передав существующий ThreadPool в аргумент.

 @Autowired
 @Qualifier("commonThreadPool") 
 TaskExecutor existingThreadPool;       
 CompletableFuture.runAsync(() -> executeTask(),existingThreadPool);

2.Используя async, как показано ниже

@Async("commonThreadPool")
public void executeTask() {
// Execute Some Task
}

, существует ли третий способ, где я могу написать обработчик CompletableFuture или переопределить его существующее поведение в одном месте, где я могу передать пользовательский Threadpool.И после этого, где бы я ни использовал приведенный ниже код, он должен выбрать мой существующий ThreadPool вместо пула forkJoin.

 CompletableFuture.runAsync(() -> executeTask());

Ответы [ 2 ]

1 голос
/ 19 мая 2019

Я бы настоятельно рекомендовал против этого, но если вы действительно хотите, вы можете использовать рефлексию, чтобы изменить пул потоков, используемый завершаемым будущим.

public static void main(String[] args) throws Exception {
    // Prints ForkJoinPool.commonPool-worker-1
    CompletableFuture<Void> c = CompletableFuture.runAsync(() -> System.out.println(Thread.currentThread().getName()));
    c.get();

    setFinalStatic(CompletableFuture.class.getDeclaredField("asyncPool"), Executors.newFixedThreadPool(10));

    // Prints pool-1-thread-1
    c = CompletableFuture.runAsync(() -> System.out.println(Thread.currentThread().getName()));
    c.get();
}

static void setFinalStatic(Field field, Object newValue) throws Exception {
    field.setAccessible(true);
    Field modifiersField = Field.class.getDeclaredField("modifiers");
    modifiersField.setAccessible(true);
    modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
    field.set(null, newValue);
}

setFinalStatic взято из https://stackoverflow.com/a/3301720/1398418

0 голосов
/ 29 мая 2019

Не существует стандартного способа замены исполнителя по умолчанию для всех экземпляров CompletableFuture. Но начиная с Java 9, вы можете определить исполнителя по умолчанию для подклассов. Например. с

public class MyCompletableFuture<T> extends CompletableFuture<T> {
    static final Executor EXEC = r -> {
        System.out.println("executing "+r);
        new Thread(r).start();
    };

    @Override
    public Executor defaultExecutor() {
        return EXEC;
    }

    @Override
    public <U> CompletableFuture<U> newIncompleteFuture() {
        return new MyCompletableFuture<>();
    }

    public static CompletableFuture<Void> runAsync​(Runnable runnable) {
        Objects.requireNonNull(runnable);
        return supplyAsync(() -> {
            runnable.run();
            return null;
        });
    }

    public static <U> CompletableFuture<U> supplyAsync​(Supplier<U> supplier) {
        return new MyCompletableFuture<U>().completeAsync(supplier);
    }
}

вы сделали все необходимые шаги для определения исполнителя по умолчанию для всех связанных этапов MyCompletableFuture. Удержание исполнителя в EXEC служит только примером, производя распечатку при использовании, поэтому, когда вы используете этот пример класса, такой как

MyCompletableFuture.supplyAsync(() -> "test")
    .thenApplyAsync(String::toUpperCase)
    .thenAcceptAsync(System.out::println);

это напечатает

executing java.util.concurrent.CompletableFuture$AsyncSupply@65ab7765
executing java.util.concurrent.CompletableFuture$UniApply@119d7047
executing java.util.concurrent.CompletableFuture$UniAccept@404b9385
TEST
...