Планирование задач с использованием acceptEither синхронно в CompletableFuture - PullRequest
1 голос
/ 18 апреля 2020

Я изучаю параллелизм через API CompletableFuture. Допустим, у меня есть две задачи: одна занимает 250 мс, а другая - 2500 мс. В следующем коде:

        Supplier<List<Long>> supplyIds = () -> {
            sleep(200);
            return(Arrays.asList(1L, 2L, 3L));
        };

        Function<List<Long>, CompletableFuture<List<User>>> fetchUsers1 = idList -> {
            sleep(250);
            System.out.println("User2"+ Thread.currentThread().getName());
            Supplier<List<User>> userSupplier = () ->  idList.stream().map(User::new).collect(Collectors.toList());
            return(CompletableFuture.supplyAsync(userSupplier));
        };
        Function<List<Long>, CompletableFuture<List<User>>> fetchUsers2 = idList -> {
            sleep(2500);
            System.out.println("User2"+ Thread.currentThread().getName());
            Supplier<List<User>> userSupplier = () -> idList.stream().map(User::new).collect(Collectors.toList());
            return(CompletableFuture.supplyAsync(userSupplier));
        };
        Consumer<List<User>> displayer = users -> {
            users.forEach(System.out::println);
        };

        CompletableFuture<List<Long>> completableFuture = CompletableFuture.supplyAsync(supplyIds);
        CompletableFuture<List<User>> users1 = completableFuture.thenCompose(fetchUsers1);
        CompletableFuture<List<User>> users2 = completableFuture.thenCompose(fetchUsers2);

        users1.thenRun(()-> System.out.println("User 1"));
        users2.thenRun(()-> System.out.println("User 2"));
        users1.acceptEither(users2, displayer);

        sleep(6000);

я получаю следующий результат:

User2ForkJoinPool.commonPool-worker-1
User 2
1
2
3
User2ForkJoinPool.commonPool-worker-1
User 1

Я понимаю, что код выполняется синхронно, так как используется тот же общий поток пула объединения вил, и мы не указывайте тему. Меня смущает вопрос, почему сначала выполняется задача fetchUsers2, а затем задача fetchUsers1 (похоже, это соответствует каждому запуску). Я предположил, что, поскольку thenCompose вызывается на fetchUsers1 первым в коде, он сначала будет "поставлен в очередь".

1 Ответ

2 голосов
/ 21 апреля 2020

Ничто в документации не говорит о том, что порядок вызова имеет значение для thenCompose.

Поскольку вы определяете две независимые стадии, обе только в зависимости от completableFuture, не существует определенного порядок между users1 и user2, и результирующий порядок зависит только от реализации.

Вы можете воспроизводимо получить определенный заказ в одной среде, но другой порядок в другой среде. Даже в вашей среде можно получить другой заказ в некоторых сериях. Если инициирующий поток теряет ЦП после вызова supplyAsync(supplyIds) в течение 200 миллисекунд, он может выполнить действие, указанное с помощью thenCompose(fetchUsers1), непосредственно при вызове, до вызова thenCompose(fetchUsers2).

Когда порядок между двумя действия имеют значение, вы должны смоделировать зависимость между ними.

Обратите внимание, что аналогичным образом код

users1.thenRun(()-> System.out.println("User 1"));
users2.thenRun(()-> System.out.println("User 2"));
users1.acceptEither(users2, displayer);

определяет полностью независимые действия. Поскольку acceptEither применяется к users1 и users2, а не к этапам завершения, возвращаемым вызовами thenRun, он не зависит от завершения операторов печати. Эти три действия могут быть выполнены в любом порядке.

...