Я пытаюсь написать тест, который демонстрирует, что назначение новой ссылки на поле класса в многопоточной среде не является поточно-ориентированным и, более конкретно, имеет проблемы с видимостью, если это поле не объявлено как volatile
или AtomicReference
.
Сценарий, который я использую, - это класс PropertiesLoader
(показанный ниже), который должен загружать набор свойств (в настоящее время используется только одно свойство), хранящихся в Map<String, String>
, а также пытается поддерживать перезагрузку , Таким образом, многие потоки читают свойство, и в какой-то момент другой поток перезагружает новое значение, которое должно быть видимым для читающих потоков.
Тест предназначен для работы следующим образом:
- он вызывает потоки считывателя, которые ожидают вращения, пока они не "увидят" изменение значения свойства
- в В какой-то момент поток записи создает новую карту с новым значением для свойства и назначает эту карту соответствующему полю (
PropertyLoader.propertiesMap
) - , если все потоки читателя видят новое значение, тест завершается, иначе зависает навсегда.
Теперь я знаю, что, строго говоря, не существует теста, который может доказать поточность кода (или его отсутствие), но в этом случае Я чувствую, что должно быть относительно легко продемонстрировать проблему хотя бы эмпирически.
Я пытался использовать реализацию HashMap
для хранения свойств, и в этом случае тест зависает, как и ожидалось, даже если я использую только один чтение ветки.
Если, однако, используется реализация ConcurrentHashMap
, тест никогда не зависает, независимо от того, сколько потоков чтения используется (я также пытался беспорядочно ждать в потоках считывателя, но безуспешно).
Насколько я понимаю, тот факт, что ConcurrentHashMap
является потокобезопасным, не должен влиять на видимость поля, для которого он назначен. Так что volatile
/ AtomicReference
все еще требуется для этого поля. Однако приведенный выше тест, похоже, противоречит этому, поскольку он ведет себя так, как будто карта всегда безопасно публикуется без необходимости дополнительной синхронизации.
Неужели мое понимание неверно? Возможно ConcurrentHashMap
дает некоторые дополнительные обещания синхронизации, о которых я не знаю?
Любая помощь будет принята с благодарностью.
PS Приведенный ниже код должен быть исполняемым, как и тест Junit. Я запускал его на машине с AMD Ryzen 5, Windows 10, JDK 1.8.0_201 и на второй машине i7 Intel, Fedora 30, JDK 1.8.xx (не помню точную версию JDK) с теми же результатами.
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
public class PropertiesLoaderTest {
private static final String NEW_VALUE = "newValue";
private static final String OLD_VALUE = "oldValue";
private static final String PROPERTY = "property";
/**
* Controls if the reference we are testing for visibility issues ({@link PropertiesLoader#propertyMap} will
* be assigned a HashMap or ConcurrentHashMap implementation during {@link PropertiesLoader#load(boolean)}
*/
private static boolean USE_SIMPLE_MAP = false;
@Test
public void testReload() throws Exception {
PropertiesLoader loader = new PropertiesLoader();
Random random = new Random();
int readerThreads = 5;
int totalThreads = readerThreads + 1;
final CountDownLatch startLatch = new CountDownLatch(1);
final CountDownLatch finishLatch = new CountDownLatch(totalThreads);
// start reader threads that read the property trying to see the new property value
for (int i = 0; i < readerThreads; i++) {
startThread("reader-thread-" + i, startLatch, finishLatch, () -> {
while (true) {
String value = loader.getProperty(PROPERTY);
if (NEW_VALUE.equals(value)) {
log("Saw new value: " + value + " for property: " + PROPERTY);
break;
}
}
});
}
// start writer thread (i.e. the thread that reloads the properties)
startThread("writer-thread", startLatch, finishLatch, () -> {
Thread.sleep(random.nextInt(500));
log("starting reload...");
loader.reloadProperties();
log("finished reload...");
});
log("Firing " + readerThreads + " threads and 1 writer thread...");
startLatch.countDown();
log("Waiting for all threads to finish...");
finishLatch.await();
log("All threads finished. Test successful");
}
static class PropertiesLoader {
// The reference in question: this is assigned in the constructor and again when calling reloadProperties()
// It is not volatile nor AtomicReference so there are visibility concerns
Map<String, String> propertyMap;
PropertiesLoader() {
this.propertyMap = load(false);
}
public void reloadProperties() {
this.propertyMap = load(true);
}
public String getProperty(String propertyName) {
return propertyMap.get(propertyName);
}
private static Map<String, String> load(boolean isReload) {
// using a simple HashMap always hang the test as expected: the new reference cannot be
// seen by the reader thread
// using a ConcurrentHashMap always allow the test to finish no matter how many reader
// threads are used
Map<String, String> newMap = USE_SIMPLE_MAP ? new HashMap<>() : new ConcurrentHashMap<>();
newMap.put(PROPERTY, isReload ? NEW_VALUE : OLD_VALUE);
return newMap;
}
}
static void log(String msg) {
//System.out.println(Thread.currentThread().getName() + " - " + msg);
}
static void startThread(String name, CountDownLatch start, CountDownLatch finish, ThreadTask task) {
Thread t = new Thread(new ThreadTaskRunner(name, start, finish, task));
t.start();
}
@FunctionalInterface
interface ThreadTask {
void execute() throws Exception;
}
static class ThreadTaskRunner implements Runnable {
final CountDownLatch start;
final CountDownLatch finish;
final ThreadTask task;
final String name;
protected ThreadTaskRunner(String name, CountDownLatch start, CountDownLatch finish, ThreadTask task) {
this.start = start;
this.finish = finish;
this.task = task;
this.name = name;
}
@Override
public void run() {
try {
Thread.currentThread().setName(name);
start.await();
log("thread started");
task.execute();
log("thread finished successfully");
} catch (Exception e) {
log("Error: " + e.getMessage());
}
finish.countDown();
}
}
}