Java: необходим ли изменчивый доступ, когда потоки устанавливают разные ячейки массива? - PullRequest
1 голос
/ 28 апреля 2020

Рассмотрим следующий код:

public static void main(String[] args) throws InterruptedException {
    int nThreads = 10;
    MyThread[] threads = new MyThread[nThreads];

    AtomicReferenceArray<Object> array = new AtomicReferenceArray<>(nThreads);

    for (int i = 0; i < nThreads; i++) {
        MyThread thread = new MyThread(array, i);
        threads[i] = thread;
        thread.start();
    }

    for (MyThread thread : threads)
        thread.join();

    for (int i = 0; i < nThreads; i++) {
        Object obj_i = array.get(i);
        // do something with obj_i...
    }
}

private static class MyThread extends Thread {

    private final AtomicReferenceArray<Object> pArray;
    private final int pIndex;

    public MyThread(final AtomicReferenceArray<Object> array, final int index) {
        pArray = array;
        pIndex = index;
    }

    @Override
    public void run() {
        // some entirely local time-consuming computation...
        pArray.set(pIndex, /* result of the computation */);
    }

}

Каждый MyThread вычисляет что-то полностью локально (без необходимости синхронизации с другими потоками) и записывает результат в указанную c ячейку массива. Основной поток ожидает завершения всех MyThreads, а затем извлекает результаты и что-то с ними делает.

Использование методов get и set для AtomicReferenceArray обеспечивает порядок памяти, который гарантирует, что основной поток увидит результаты, записанные MyThreads.

Однако, поскольку каждая ячейка массива записывается только один раз, и никакой MyThread не должен видеть результат, написанный любым другим MyThread, мне интересно, действительно ли эти гарантии строгого упорядочения необходимо или если следующий код, с доступом к ячейкам простого массива, всегда будет давать те же результаты, что и код выше:

public static void main(String[] args) throws InterruptedException {
    int nThreads = 10;
    MyThread[] threads = new MyThread[nThreads];

    Object[] array = new Object[nThreads];

    for (int i = 0; i < nThreads; i++) {
        MyThread thread = new MyThread(array, i);
        threads[i] = thread;
        thread.start();
    }

    for (MyThread thread : threads)
        thread.join();

    for (int i = 0; i < nThreads; i++) {
        Object obj_i = array[i];
        // do something with obj_i...
    }
}

private static class MyThread extends Thread {

    private final Object[] pArray;
    private final int pIndex;

    public MyThread(final Object[] array, final int index) {
        pArray = array;
        pIndex = index;
    }

    @Override
    public void run() {
        // some entirely local time-consuming computation...
        pArray[pIndex] = /* result of the computation */;
    }

}

С одной стороны, при доступе в простом режиме компилятор или среда выполнения могут произойдет оптимизация доступа к чтению к array в конечном l oop основного потока и заменит Object obj_i = array[i]; на Object obj_i = null; (неявная инициализация массива), так как массив не изменяется внутри этого потока. С другой стороны, я где-то читал, что Thread.join делает все изменения присоединенного потока видимыми для вызывающего потока (что было бы разумно), поэтому Object obj_i = array[i]; должен видеть ссылку на объект, назначенную i -ым MyThread .

Итак, последний код даст те же результаты, что и выше?

Ответы [ 3 ]

3 голосов
/ 28 апреля 2020

Итак, будет ли последний код иметь те же результаты, что и выше?

Да.

"Где-то", которое вы прочитали о Thread.join может быть JLS 17.4.5 (Бит "Порядок происходит до" Java Модель памяти):

Все действия в потоке происходят до любого другого потока успешно возвращается из join() в этом потоке.

Таким образом, все ваши записи в отдельные элементы будут происходить до финального join().

С учетом сказанного, я бы Настоятельно рекомендуем вам искать альтернативные способы структурирования вашей проблемы, которые не требуют, чтобы вы беспокоились о правильности кода на этом уровне детализации (см. мой другой ответ ).

2 голосов
/ 28 апреля 2020

Здесь может показаться, что более простым решением является использование среды Executor, которая обычно скрывает ненужные подробности о потоках и о том, как сохраняется результат.

Например:

ExecutorService executor = ...

List<Future<Object>> futures = new ArrayList<>();
for (int i = 0; i < nThreads; i++) {
  futures.add(executor.submit(new MyCallable<>(i)));
}
executor.shutdown();

for (int i = 0; i < nThreads; ++i) {
  array[i] = futures.get(i).get();
}

for (int i = 0; i < nThreads; i++) {
    Object obj_i = array[i];
    // do something with obj_i...
}

где MyCallable аналогично вашему MyThread:

private static class MyCallable implements Callable<Object> {

    private final int pIndex;

    public MyCallable(final int index) {
        pIndex = index;
    }

    @Override
    public Object call() {
        // some entirely local time-consuming computation...
        return /* result of the computation */;
    }

}

Это приводит к более простому и более очевидно правильному коду, потому что вы не беспокоитесь о согласованности памяти: это обрабатывается платформой. Это также дает вам большую гибкость, например, запуск его в меньшем количестве потоков, чем в рабочих элементах, повторное использование пула потоков и т. Д. c.

0 голосов
/ 28 апреля 2020

Atomi c операции необходимы для обеспечения наличия барьеров памяти, когда несколько потоков обращаются к одной и той же ячейке памяти. Без барьеров памяти между потоками не существует отношения «произошло до», и нет никакой гарантии, что основной поток увидит изменения, сделанные другими потоками, а следовательно, и перенос данных. Так что вам действительно нужны барьеры памяти для операций записи и чтения. Этого можно добиться, используя AtomicReferenceArray или синхронизированный блок для общего объекта.

Во второй программе перед операциями чтения Thread.join. Это должно удалить гонку данных. Без join вам потребуется явная синхронизация.

...