Безопасная публикация ConcurrentHashMap в поле класса - PullRequest
4 голосов
/ 26 января 2020

Я пытаюсь написать тест, который демонстрирует, что назначение новой ссылки на поле класса в многопоточной среде не является поточно-ориентированным и, более конкретно, имеет проблемы с видимостью, если это поле не объявлено как volatile или AtomicReference.

Сценарий, который я использую, - это класс PropertiesLoader (показанный ниже), который должен загружать набор свойств (в настоящее время используется только одно свойство), хранящихся в Map<String, String>, а также пытается поддерживать перезагрузку , Таким образом, многие потоки читают свойство, и в какой-то момент другой поток перезагружает новое значение, которое должно быть видимым для читающих потоков.

Тест предназначен для работы следующим образом:

  • он вызывает потоки считывателя, которые ожидают вращения, пока они не "увидят" изменение значения свойства
  • в В какой-то момент поток записи создает новую карту с новым значением для свойства и назначает эту карту соответствующему полю (PropertyLoader.propertiesMap)
  • , если все потоки читателя видят новое значение, тест завершается, иначе зависает навсегда.

Теперь я знаю, что, строго говоря, не существует теста, который может доказать поточность кода (или его отсутствие), но в этом случае Я чувствую, что должно быть относительно легко продемонстрировать проблему хотя бы эмпирически.

Я пытался использовать реализацию HashMap для хранения свойств, и в этом случае тест зависает, как и ожидалось, даже если я использую только один чтение ветки.

Если, однако, используется реализация ConcurrentHashMap, тест никогда не зависает, независимо от того, сколько потоков чтения используется (я также пытался беспорядочно ждать в потоках считывателя, но безуспешно).

Насколько я понимаю, тот факт, что ConcurrentHashMap является потокобезопасным, не должен влиять на видимость поля, для которого он назначен. Так что volatile / AtomicReference все еще требуется для этого поля. Однако приведенный выше тест, похоже, противоречит этому, поскольку он ведет себя так, как будто карта всегда безопасно публикуется без необходимости дополнительной синхронизации.

Неужели мое понимание неверно? Возможно ConcurrentHashMap дает некоторые дополнительные обещания синхронизации, о которых я не знаю?

Любая помощь будет принята с благодарностью.

PS Приведенный ниже код должен быть исполняемым, как и тест Junit. Я запускал его на машине с AMD Ryzen 5, Windows 10, JDK 1.8.0_201 и на второй машине i7 Intel, Fedora 30, JDK 1.8.xx (не помню точную версию JDK) с теми же результатами.

import org.junit.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

public class PropertiesLoaderTest {

    private static final String NEW_VALUE = "newValue";
    private static final String OLD_VALUE = "oldValue";
    private static final String PROPERTY = "property";

    /**
     *  Controls if the reference we are testing for visibility issues ({@link PropertiesLoader#propertyMap} will
     *  be assigned a HashMap or ConcurrentHashMap implementation during {@link PropertiesLoader#load(boolean)}
     */
    private static boolean USE_SIMPLE_MAP = false;

    @Test
    public void testReload() throws Exception {
        PropertiesLoader loader = new PropertiesLoader();
        Random random = new Random();

        int readerThreads = 5;
        int totalThreads = readerThreads + 1;

        final CountDownLatch startLatch = new CountDownLatch(1);
        final CountDownLatch finishLatch = new CountDownLatch(totalThreads);

        // start reader threads that read the property trying to see the new property value
        for (int i = 0; i < readerThreads; i++) {
            startThread("reader-thread-" + i, startLatch, finishLatch, () -> {
                while (true) {
                    String value = loader.getProperty(PROPERTY);
                    if (NEW_VALUE.equals(value)) {
                        log("Saw new value: " + value + " for property: " + PROPERTY);
                        break;
                    }
                }
            });
        }

        // start writer thread (i.e. the thread that reloads the properties)
        startThread("writer-thread", startLatch, finishLatch, () -> {
            Thread.sleep(random.nextInt(500));

            log("starting reload...");
            loader.reloadProperties();
            log("finished reload...");
        });

        log("Firing " + readerThreads + " threads and 1 writer thread...");
        startLatch.countDown();

        log("Waiting for all threads to finish...");
        finishLatch.await();
        log("All threads finished. Test successful");
    }

    static class PropertiesLoader {
        // The reference in question: this is assigned in the constructor and again when calling reloadProperties()
        // It is not volatile nor AtomicReference so there are visibility concerns
        Map<String, String> propertyMap;

        PropertiesLoader() {
            this.propertyMap = load(false);
        }

        public void reloadProperties() {
            this.propertyMap = load(true);
        }

        public String getProperty(String propertyName) {
            return propertyMap.get(propertyName);
        }

        private static Map<String, String> load(boolean isReload) {
            // using a simple HashMap always hang the test as expected: the new reference cannot be
            // seen by the reader thread

            // using a ConcurrentHashMap always allow the test to finish no matter how many reader
            // threads are used
            Map<String, String> newMap = USE_SIMPLE_MAP ? new HashMap<>() : new ConcurrentHashMap<>();
            newMap.put(PROPERTY, isReload ? NEW_VALUE : OLD_VALUE);
            return newMap;
        }
    }

    static void log(String msg) {
        //System.out.println(Thread.currentThread().getName() + " - " + msg);
    }

    static void startThread(String name, CountDownLatch start, CountDownLatch finish, ThreadTask task) {
        Thread t = new Thread(new ThreadTaskRunner(name, start, finish, task));
        t.start();
    }

    @FunctionalInterface
    interface ThreadTask {
        void execute() throws Exception;
    }

    static class ThreadTaskRunner implements Runnable {
        final CountDownLatch start;
        final CountDownLatch finish;
        final ThreadTask task;
        final String name;

        protected ThreadTaskRunner(String name, CountDownLatch start, CountDownLatch finish, ThreadTask task) {
            this.start = start;
            this.finish = finish;
            this.task = task;
            this.name = name;
        }

        @Override
        public void run() {
            try {
                Thread.currentThread().setName(name);
                start.await();
                log("thread started");
                task.execute();
                log("thread finished successfully");
            } catch (Exception e) {
                log("Error: " + e.getMessage());
            }
            finish.countDown();
        }
    }
}

1 Ответ

1 голос
/ 27 января 2020

Это немного хуже, чем вы думаете, но есть и экономия.

Немного хуже: конструкторы не синхронизируются. В этом случае это означает, что PropertiesLoader.propertyMap, созданный в конструкторе, не гарантированно будет видимым для других потоков (читателя или записывающего). Ваша спасительная льгота здесь - это CountDownLatch es, которые вы используете (они устанавливают отношение sh a happen-before), а также Thread.start (который также устанавливает отношение sh a happen-before). Кроме того, на практике «конструкторы не синхронизированы» редко являются проблемой и их трудно воспроизвести (см. Также тестовый код ниже). Для получения дополнительной информации по этому вопросу, пожалуйста, прочитайте этот вопрос . Вывод заключается в том, что PropertiesLoader.propertyMap должно быть либо volatile / AtomicReference, либо final (final может использоваться в сочетании с ConcurrentHashMap).

Причина, по которой вы не можете воспроизвести синхронизацию Проблема с ConcurrentHashMap по той же причине, по которой трудно воспроизвести проблему «конструкторы не синхронизированы»: ConcurrentHashMap использует внутреннюю синхронизацию (см. этот ответ ), которая вызывает грипп памяти sh это не только делает новые значения на карте видимыми для других потоков, но также и новое значение PropertiesLoader.propertyMap.

Обратите внимание, что volatile PropertiesLoader.propertyMap гарантирует (а не только делает это вероятным), что новые значения видимы другим потокам (ConcurrentHashMap не требуется, см. также этот ответ ). Я обычно устанавливаю такие карты на карту только для чтения (с помощью Collections.unmodifiableMap()), чтобы сообщить другим программистам, что это не обычная карта, которую можно обновлять или изменять по желанию.

Ниже еще немного тестового кода, который пытается устранить как можно больше синхронизации. Конечный результат теста точно такой же, но он также показывает побочный эффект наличия летучего логического значения в al oop и того, что ненулевое присвоение propertyMap так или иначе всегда замечается другими потоками.

package so;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class MapVisibility {

    static int readerThreadsAmount = 2;

    public static void main(String[] args) {

        ExecutorService executors = Executors.newFixedThreadPool(readerThreadsAmount);
        try {
            new MapVisibility().run(executors);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executors.shutdownNow(); // Does not work on FAIL, manually kill reader-task from task-manager.
        }
    }

    //final boolean useConcurrentMap = false;
    // When ConcurrentHashMap is used, test is always a success.
    final boolean useConcurrentMap = true;

    final boolean useStopBoolean = false;
    // When volatile stop boolean is used, test is always a success.
    //final boolean useStopBoolean = true;

    //final boolean writeToConsole = false;
    // Writing to System.out is synchronized, this can make a test succeed that would otherwise fail.
    final boolean writeToConsole = true;

    Map<String, String> propertyMap;
    // When the map is volatile, test is always a success.
    //volatile Map<String, String> propertyMap;

    final String oldValue = "oldValue";
    final String newValue = "newValue";
    final String key = "key";
    volatile boolean stop;

    void run(ExecutorService executors) throws Exception {

        IntStream.range(0,  readerThreadsAmount).forEach(i -> {
            executors.execute(new MapReader());
        });
        sleep(500); // give readers a chance to start
        setMap(oldValue);
        sleep(100); // give readers a chance to read map
        setMap(newValue);
        sleep(100); // give readers a chance to read new value in new map
        executors.shutdown();
        if (!executors.awaitTermination(100L, TimeUnit.MILLISECONDS)) {
            System.out.println("FAIL");
            stop = true;
        } else {
            System.out.println("Success");
        }
    }

    void setMap(String value) {

        Map<String, String> newMap = (useConcurrentMap ? new ConcurrentHashMap<>() : new HashMap<>());
        newMap.put(key, value);
        propertyMap = newMap;
    }

    class MapReader implements Runnable {

        @Override
        public void run() {
            print("Reader started.");
            final long startTime = System.currentTimeMillis();
            while (propertyMap == null) {
                // In worse case, this loop should never exit but it always does.
                // No idea why.
                sleep(1);
            }
            print((System.currentTimeMillis() - startTime) + " Reader got map.");
            if (useStopBoolean) {
                while (!stop) {
                    if (newValue.equals(propertyMap.get(key))) {
                        break;
                    }
                }
            } else {
                while (true) {
                    if (newValue.equals(propertyMap.get(key))) {
                        break;
                    }
                }
            }
            print((System.currentTimeMillis() - startTime) + " Reader got new value.");
        }
    }

    void print(String msg) {
        if (writeToConsole) {
            System.out.println(msg);
        }
    }

    void sleep(int timeout) {

        // instead of using Thread.sleep, do some busy-work instead.
        final long startTime = System.currentTimeMillis();
        Random r = new Random();
        @SuppressWarnings("unused")
        long loopCount = 0;
        while (System.currentTimeMillis() - startTime < timeout) {
            for (int i = 0; i < 100_000; i++) {
                double d = r.nextDouble();
                double v = r.nextDouble();
                @SuppressWarnings("unused")
                double dummy = d / v;
            }
            loopCount++;
        }
        //print("Loops: " + loopCount);
    }

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...