Блокирует ли вызов join в следующем примере CompletableFuture - PullRequest
0 голосов
/ 02 июня 2019

Я пытаюсь понять CompletableFutures и цепочку вызовов, которые возвращают завершенные фьючерсы, и я создал нижеприведенный пример, который имитирует два вызова базы данных.

Предполагается, что первый метод дает завершаемое будущее со списком userIds, а затем мне нужно вызвать другой метод, предоставляющий userId для получения пользователя (в данном случае - строки).

Подвести итог:
1. получить идентификаторы
2. получить список пользователей, отвечающих этим идентификаторам.

Я создал простые методы, чтобы смоделировать ответы с подменой потоков. Пожалуйста, проверьте код ниже

public class PipelineOfTasksExample {

    private Map<Long, String> db = new HashMap<>();

    PipelineOfTasksExample() {
        db.put(1L, "user1");
        db.put(2L, "user2");
        db.put(3L, "user3");
        db.put(4L, "user4");
    }


    private CompletableFuture<List<Long>> returnUserIdsFromDb() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("building the list of Ids" + " - thread: " + Thread.currentThread().getName());
        return CompletableFuture.supplyAsync(() -> Arrays.asList(1L, 2L, 3L, 4L));
    }

    private CompletableFuture<String> fetchById(Long id) {
        CompletableFuture<String> cfId = CompletableFuture.supplyAsync(() -> db.get(id));
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("fetching id: " + id + " -> " + db.get(id) + " thread: " + Thread.currentThread().getName());
        return cfId;
    }

    public static void main(String[] args) {

        PipelineOfTasksExample example = new PipelineOfTasksExample();

        CompletableFuture<List<String>> result = example.returnUserIdsFromDb()
                .thenCompose(listOfIds ->
                        CompletableFuture.supplyAsync(
                                () -> listOfIds.parallelStream()
                                        .map(id -> example.fetchById(id).join())
                                        .collect(Collectors.toList()
                                        )
                        )
                );

        System.out.println(result.join());
    }

}

Мой вопрос заключается в том, разрушает ли вызов объединения (example.fetchById(id).join()) неблокирующую природу процесса. Если ответ положительный, как я могу решить эту проблему?

Заранее спасибо

1 Ответ

1 голос
/ 05 июня 2019

Ваш пример немного странный, так как вы замедляете основной поток в returnUserIdsFromDb(), прежде чем какая-либо операция даже запустится, и аналогично fetchById замедляет вызывающую, а не асинхронную операцию, что сводит на нет всю цель асинхронной операции.

Далее, вместо .thenCompose(listOfIds -> CompletableFuture.supplyAsync(() -> …)) вы можете просто использовать .thenApplyAsync(listOfIds -> …).

Так что лучшим примером может быть

public class PipelineOfTasksExample {
    private final Map<Long, String> db = LongStream.rangeClosed(1, 4).boxed()
        .collect(Collectors.toMap(id -> id, id -> "user"+id));

    PipelineOfTasksExample() {}

    private static <T> T slowDown(String op, T result) {
        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
        System.out.println(op + " -> " + result + " thread: "
            + Thread.currentThread().getName()+ ", "
            + POOL.getPoolSize() + " threads");
        return result;
    }
    private CompletableFuture<List<Long>> returnUserIdsFromDb() {
        System.out.println("trigger building the list of Ids - thread: "
            + Thread.currentThread().getName());
        return CompletableFuture.supplyAsync(
            () -> slowDown("building the list of Ids", Arrays.asList(1L, 2L, 3L, 4L)),
            POOL);
    }
    private CompletableFuture<String> fetchById(Long id) {
        System.out.println("trigger fetching id: " + id + " thread: "
            + Thread.currentThread().getName());
        return CompletableFuture.supplyAsync(
            () -> slowDown("fetching id: " + id , db.get(id)), POOL);
    }

    static ForkJoinPool POOL = new ForkJoinPool(2);

    public static void main(String[] args) {
        PipelineOfTasksExample example = new PipelineOfTasksExample();
        CompletableFuture<List<String>> result = example.returnUserIdsFromDb()
            .thenApplyAsync(listOfIds ->
                listOfIds.parallelStream()
                    .map(id -> example.fetchById(id).join())
                    .collect(Collectors.toList()
                ),
                POOL
            );
        System.out.println(result.join());
    }
}

который печатает что-то вроде

trigger building the list of Ids - thread: main
building the list of Ids -> [1, 2, 3, 4] thread: ForkJoinPool-1-worker-1, 1 threads
trigger fetching id: 2 thread: ForkJoinPool-1-worker-0
trigger fetching id: 3 thread: ForkJoinPool-1-worker-1
trigger fetching id: 4 thread: ForkJoinPool-1-worker-2
fetching id: 4 -> user4 thread: ForkJoinPool-1-worker-3, 4 threads
fetching id: 2 -> user2 thread: ForkJoinPool-1-worker-3, 4 threads
fetching id: 3 -> user3 thread: ForkJoinPool-1-worker-2, 4 threads
trigger fetching id: 1 thread: ForkJoinPool-1-worker-3
fetching id: 1 -> user1 thread: ForkJoinPool-1-worker-2, 4 threads
[user1, user2, user3, user4]

, что может показаться удивительным количеством потоков на первый взгляд.

Ответ заключается в том, что join() может блокировать поток, но если это происходит внутри рабочего потока пула Fork / Join, эта ситуация будет обнаружена, и будет запущен новый поток компенсации, чтобы обеспечить сконфигурированный целевой параллелизм .

В особом случае, когда используется пул Fork / Join по умолчанию, реализация может подобрать новые отложенные задачи в методе join(), чтобы обеспечить прогресс в том же потоке.

Таким образом, код всегда будет прогрессировать, и нет ничего плохого в том, чтобы время от времени вызывать join(), если альтернативы намного сложнее, но существует некоторая опасность слишком большого потребления ресурсов, если его использовать чрезмерно. Ведь причиной использования пулов потоков является ограничение количества потоков.

Альтернатива - использовать цепные зависимые операции, где это возможно.

public class PipelineOfTasksExample {
    private final Map<Long, String> db = LongStream.rangeClosed(1, 4).boxed()
        .collect(Collectors.toMap(id -> id, id -> "user"+id));

    PipelineOfTasksExample() {}

    private static <T> T slowDown(String op, T result) {
        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
        System.out.println(op + " -> " + result + " thread: "
            + Thread.currentThread().getName()+ ", "
            + POOL.getPoolSize() + " threads");
        return result;
    }
    private CompletableFuture<List<Long>> returnUserIdsFromDb() {
        System.out.println("trigger building the list of Ids - thread: "
            + Thread.currentThread().getName());
        return CompletableFuture.supplyAsync(
            () -> slowDown("building the list of Ids", Arrays.asList(1L, 2L, 3L, 4L)),
            POOL);
    }
    private CompletableFuture<String> fetchById(Long id) {
        System.out.println("trigger fetching id: " + id + " thread: "
            + Thread.currentThread().getName());
        return CompletableFuture.supplyAsync(
            () -> slowDown("fetching id: " + id , db.get(id)), POOL);
    }

    static ForkJoinPool POOL = new ForkJoinPool(2);

    public static void main(String[] args) {
        PipelineOfTasksExample example = new PipelineOfTasksExample();

        CompletableFuture<List<String>> result = example.returnUserIdsFromDb()
            .thenComposeAsync(listOfIds -> {
                List<CompletableFuture<String>> jobs = listOfIds.parallelStream()
                    .map(id -> example.fetchById(id))
                    .collect(Collectors.toList());
                return CompletableFuture.allOf(jobs.toArray(new CompletableFuture<?>[0]))
                    .thenApply(_void -> jobs.stream()
                        .map(CompletableFuture::join).collect(Collectors.toList()));
                },
                POOL
            );

        System.out.println(result.join());
        System.out.println(ForkJoinPool.commonPool().getPoolSize());
    }
}

Разница в том, что сначала отправляются все асинхронные задания, затем запланировано зависимое действие, вызывающее для них join, которое будет выполняться только после завершения всех заданий, поэтому эти вызовы join никогда не будут блокироваться. Только последний вызов join в конце метода main может блокировать основной поток.

Так что это печатает что-то вроде

trigger building the list of Ids - thread: main
building the list of Ids -> [1, 2, 3, 4] thread: ForkJoinPool-1-worker-1, 1 threads
trigger fetching id: 3 thread: ForkJoinPool-1-worker-1
trigger fetching id: 2 thread: ForkJoinPool-1-worker-0
trigger fetching id: 4 thread: ForkJoinPool-1-worker-1
trigger fetching id: 1 thread: ForkJoinPool-1-worker-0
fetching id: 4 -> user4 thread: ForkJoinPool-1-worker-1, 2 threads
fetching id: 3 -> user3 thread: ForkJoinPool-1-worker-0, 2 threads
fetching id: 2 -> user2 thread: ForkJoinPool-1-worker-1, 2 threads
fetching id: 1 -> user1 thread: ForkJoinPool-1-worker-0, 2 threads
[user1, user2, user3, user4]

показывает, что не нужно создавать потоки компенсации, поэтому количество потоков соответствует настроенному целевому параллелизму.

Обратите внимание, что если фактическая работа выполняется в фоновом потоке, а не в самом методе fetchById, теперь вам больше не нужен параллельный поток, поскольку нет блокирующего вызова join(). Для таких сценариев простое использование stream() обычно приводит к повышению производительности.

...