Java 8 Completable Future - параллельное выполнение - PullRequest
1 голос
/ 23 марта 2019

Я проверяю, как работает CompletableFuture. Меня интересует, как выполнять задачи параллельно:

try {
          CompletableFuture one = CompletableFuture.runAsync(() -> {
          throw new RuntimeException("error");
          });
          CompletableFuture two = CompletableFuture.runAsync(() -> System.out.println("2"));
          CompletableFuture three = CompletableFuture.runAsync(() -> System.out.println("3"));
          CompletableFuture all = CompletableFuture.allOf(one, two, three);
          all.get();
} catch (InterruptedException e) {
           System.out.println(e);
} catch (ExecutionException e) {
           System.out.println(e);
}

В этом случае все они будут выполнены.

1 . Можно ли прервать все запущенные потоки, если в одном из них есть исключение?

2 . Когда этот код находится внутри метода класса, который может быть вызван из разных потоков, он будет потокобезопасным?

Ответы [ 2 ]

2 голосов
/ 23 марта 2019

1. Можно ли прервать все запущенные потоки, если в одном из них есть исключение?

Да, это возможно.Все потоки должны иметь доступ к общему объекту, состояние которого может быть изменено и прочитано другими потоками.Это может быть например AtomicInteger.См. Пример ниже:

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

public class Dates {

    public static void main(String[] args) throws Exception {
        try {
            AtomicInteger excCounter = new AtomicInteger(0);
            CompletableFuture one = CompletableFuture.runAsync(new ExcRunnable(excCounter));
            CompletableFuture two = CompletableFuture.runAsync(new PrintRunnable("2", excCounter));
            CompletableFuture three = CompletableFuture.runAsync(new PrintRunnable("3", excCounter));
            CompletableFuture all = CompletableFuture.allOf(one, two, three);
            all.get();
        } catch (InterruptedException | ExecutionException e) {
            System.out.println(e);
        }
    }
}

class ExcRunnable implements Runnable {
    private final AtomicInteger excCounter;

    public ExcRunnable(AtomicInteger excCounter) {
        this.excCounter = excCounter;
    }

    @Override
    public void run() {
        Random random = new Random();
        int millis = (int) (random.nextDouble() * 5000);
        System.out.println("Wait " + millis);
        Threads.sleep(450);

        // Inform another threads that exc occurred
        excCounter.incrementAndGet();

        throw new RuntimeException("error");
    }
}

class PrintRunnable implements Runnable {

    private final String name;
    private final AtomicInteger excCounter;

    public PrintRunnable(String name, AtomicInteger excCounter) {
        this.name = name;
        this.excCounter = excCounter;
    }

    @Override
    public void run() {
        int counter = 10;
        while (counter-- > 0 && excCounter.get() == 0) {
            System.out.println(name);
            Threads.sleep(450);
        }
    }
}

class Threads {
    static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

У нас есть 3 задачи: две, которые выводят его имя, и одна, которая выдает исключение через некоторое время.Перед выдачей исключения счетчик увеличивается, чтобы сообщить другим задачам, что одна из них не выполнена, и они должны завершить выполнение.Задания печати проверяют этот счетчик и, если условие не выполняется, они заканчивают свою работу.Когда вы комментируете excCounter.incrementAndGet(); строку, другие задачи заканчивают свою работу, не зная, что одна из них вызвала исключение.

Когда этот код находится внутри метода класса, который может быть вызван из разных потоков, будет ли он потокобезопасным?

Взгляните на определение безопасность потоков .Например, предположим, что задачи печати увеличивают общий счетчик с каждой напечатанной строкой.Если счетчик примитив int, это не безопасность потока, поскольку значение счетчика можно заменить.Но если вы используете AtomicInteger, это потокобезопасность, потому что AtomicInteger это потокобезопасность.

1 голос
/ 23 марта 2019

1

если любое из ваших асинхронных заданий выдает исключение, all.get () сгенерирует исключение.Это означает, что вы можете отменить все CF в предложении catch.Но ваши асинхронные задачи должны быть дружественными по отношению к прерываниям, т. Е. Проверять наличие периодического флага прерывания или обрабатывать InterruptedException и возвращать рано.

Отмена задачи всегда должна обрабатываться с использованием механизма прерываний

2

Все упомянутые вами ссылочные переменные являются локальными, поэтому не нужно беспокоиться о безопасности потоков.Локальные переменные всегда потокобезопасны.

...