Расширение java ThreadLocal для сброса значений во всех потоках - PullRequest
1 голос
/ 01 февраля 2012

После просмотра этого вопроса я думаю, что хочу обернуть ThreadLocal, чтобы добавить поведение сброса.

Я хочу иметь что-то похожее на ThreadLocal, с методом, который я могу вызвать из любого потока, чтобы установить все значения обратно на одно и то же значение. Пока у меня есть это:

public class ThreadLocalFlag {

    private ThreadLocal<Boolean> flag;
    private List<Boolean> allValues = new ArrayList<Boolean>();

    public ThreadLocalFlag() {
        flag = new ThreadLocal<Boolean>() {
            @Override protected Boolean initialValue() {
                Boolean value = false;
                allValues.add(value);
                return value;
            }
        };
    }

    public boolean get() {
        return flag.get();
    }

    public void set(Boolean value) {
        flag.set(value);
    }

    public void setAll(Boolean value) {
        for (Boolean tlValue : allValues) {
            tlValue = value;
        }
    }
}

Я обеспокоен тем, что автобокс примитива может означать, что копии, которые я сохранил в списке, не будут ссылаться на те же переменные, на которые ссылается ThreadLocal, если я попытаюсь установить их. Я еще не тестировал этот код, и с чем-то хитрым, как это, я ищу совет специалиста, прежде чем я продолжу этот путь.

Кто-то спросит: «Зачем ты это делаешь?». Я работаю в среде, где есть другие потоки, которые вызывают обратный вызов в моем коде, и у меня нет ссылок на них. Периодически я хочу обновить значение в переменной ThreadLocal, которую они используют, поэтому для выполнения этого обновления требуется, чтобы поток, использующий переменную, выполнил обновление. Мне просто нужен способ уведомить все эти потоки, что их переменная ThreadLocal устарела.


Я польщен тем, что в последнее время появилась новая критика в отношении этого трехлетнего вопроса, хотя я чувствую, что его тон немного меньше профессионального. Решение, которое я предоставил, работало без происшествий в течение этого времени. Однако должны быть лучшие способы достижения цели, которая поставила этот вопрос, и я приглашаю критиков дать ответ, который явно лучше. С этой целью я постараюсь быть более ясным в отношении проблемы, которую пытался решить.

Как я упоминал ранее, я использовал фреймворк, где несколько потоков используют мой код, вне моего контроля. Эта среда была QuickFIX / J, и я реализовывал интерфейс приложения . Этот интерфейс определяет ловушки для обработки сообщений FIX, и в моем случае инфраструктура была настроена на многопоточность, чтобы каждое соединение FIX с приложением могло обрабатываться одновременно.

Тем не менее, среда QuickFIX / J использует только один экземпляр моей реализации этого интерфейса для всех потоков. Я не контролирую, как потоки запускаются, и каждый обслуживает отдельное соединение с различными деталями конфигурации и другим состоянием. Было естественно позволить некоторым из этого состояния, к которому часто обращаются, но редко обновляют, жить в различных ThreadLocal s, которые загружают свое начальное значение, как только среда запускает поток.

В других частях организации у нас был библиотечный код, позволяющий нам регистрироваться для обратных вызовов для уведомления о деталях конфигурации, которые изменяются во время выполнения. Я хотел зарегистрироваться для этого обратного вызова, и когда я получил его, я хотел, чтобы все потоки знали, что пришло время перезагрузить значения этих ThreadLocal s, поскольку они, возможно, изменились. Этот обратный вызов происходит из потока, который я не контролирую, как потоки QuickFIX / J.

Мое решение, приведенное ниже, использует ThreadLocalFlag (обернутый ThreadLocal<AtomicBoolean>) исключительно для того, чтобы сигнализировать другим потокам, что, возможно, пришло время обновить их значения. Обратный вызов вызывает setAll(true), а потоки QuickFIX / J вызывают set(false), когда они начинают свое обновление. Я преуменьшил проблемы параллелизма ArrayList, поскольку список добавлялся только во время запуска, и мой вариант использования был меньше размера списка по умолчанию.

Я полагаю, что ту же задачу можно выполнить с помощью других методов межпотоковой коммуникации, но для того, что она делает, это казалось более практичным. Я приветствую другие решения.

Ответы [ 3 ]

7 голосов
/ 27 марта 2013

Взаимодействие с объектами в ThreadLocal между потоками

Я сразу скажу, что это плохая идея.ThreadLocal - это специальный класс , который обеспечивает преимущества скорости и безопасности потоков , если используется правильно .Попытка установить связь между потоками с помощью ThreadLocal в первую очередь сводит на нет цель использования класса.

Если вам нужен доступ к объекту в нескольких потоках, для этого есть инструменты, в частности, для потоков.безопасные коллекции в java.util.collect.concurrent, такие как ConcurrentHashMap, которые можно использовать для репликации ThreadLocal, используя Thread объекты в качестве ключей, например:

ConcurrentHashMap<Thread, AtomicBoolean> map = new ConcurrentHashMap<>();

// pass map to threads, let them do work, using Thread.currentThread() as the key

// Update all known thread's flags
for(AtomicBoolean b : map.values()) {
    b.set(true);
}

Более четкий, более краткий и избегающий использования ThreadLocal способом, для которого он просто не предназначен.

Уведомление потоков о том, что их данные устарели

Мне просто нужноспособ уведомить все эти потоки, что их переменная ThreadLocal устарела.

Если ваша цель - просто уведомить другие потоки о том, что что-то изменилось, вам вообще не нужен ThreadLocal.Просто используйте один AtomicBoolean и поделитесь им со всеми своими задачами, точно так же, как и с ThreadLocal<AtomicBoolean>.Как следует из названия, обновления для AtomicBoolean являются атомарными и видимыми перекрестными потоками.Еще лучше было бы использовать реальное вспомогательное средство синхронизации, такое как CyclicBarrier или Phaser, но для простых случаев использования нет никакого вреда, если просто использовать AtomicBoolean.

Создание обновляемого "ThreadLocal"

Все это говорит о том, что если вы действительно хотите реализовать глобально обновляемое ThreadLocal, ваша реализация нарушена.Тот факт, что вы не столкнулись с проблемами, является лишь совпадением, и в будущем рефакторинг может привести к трудным для диагностики ошибкам или сбоям.То, что « сработало без происшествий * » означает, что ваши тесты не завершены.

  • Прежде всего, ArrayList не является поточно-ориентированным.Вы просто не можете использовать его (без внешней синхронизации), когда с ним могут взаимодействовать несколько потоков, даже если они будут это делать в разное время.То, что вы сейчас не видите проблем, является просто совпадением.
  • Хранение объектов в виде List не позволяет нам удалять устаревшие значения.Если вы позвоните ThreadLocal.set(), он добавится в ваш список, не удаляя предыдущее значение, что приводит как к утечке памяти, так и к потенциальным неожиданным побочным эффектам, если вы ожидаете, что эти объекты станут недоступными после завершения потока, как это обычно происходит сThreadLocal экземпляров.Ваш вариант использования позволяет избежать этой проблемы по совпадению, но по-прежнему нет необходимости использовать List.

Вот реализация IterableThreadLocal, которая безопасно хранит и обновляет все существующие экземпляры *Значения 1066 *, и работает для любого типа, который вы решите использовать:

import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;

import com.google.common.collect.MapMaker;

/**
 * Class extends ThreadLocal to enable user to iterate over all objects
 * held by the ThreadLocal instance.  Note that this is inherently not
 * thread-safe, and violates both the contract of ThreadLocal and much
 * of the benefit of using a ThreadLocal object.  This class incurs all
 * the overhead of a ConcurrentHashMap, perhaps you would prefer to
 * simply use a ConcurrentHashMap directly instead?
 * 
 * If you do really want to use this class, be wary of its iterator.
 * While it is as threadsafe as ConcurrentHashMap's iterator, it cannot
 * guarantee that all existing objects in the ThreadLocal are available
 * to the iterator, and it cannot prevent you from doing dangerous
 * things with the returned values.  If the returned values are not
 * properly thread-safe, you will introduce issues.
 */
public class IterableThreadLocal<T> extends ThreadLocal<T>
                                    implements Iterable<T> {
    private final ConcurrentMap<Thread,T> map;

    public IterableThreadLocal() {
        map = new MapMaker().weakKeys().makeMap();
    }

    @Override
    public T get() {
        T val = super.get();
        map.putIfAbsent(Thread.currentThread(), val);
        return val;
    }

    @Override
    public void set(T value) {
        map.put(Thread.currentThread(), value);
        super.set(value);
    }

    /**
     * Note that this method fundamentally violates the contract of
     * ThreadLocal, and exposes all objects to the calling thread.
     * Use with extreme caution, and preferably only when you know
     * no other threads will be modifying / using their ThreadLocal
     * references anymore.
     */
    @Override
    public Iterator<T> iterator() {
        return map.values().iterator();
    }
}

Как вы можете надеяться, вы увидите, что это всего лишь обертка вокруг ConcurrentHashMap, и на нее накладываются те же накладные расходы, что и наиспользуя его напрямую, но скрытый в реализации ThreadLocal, который пользователи обычно ожидают быть быстрым и поточно-ориентированным.Я реализовал это в демонстрационных целях, но я действительно не могу рекомендовать использовать его в любых условиях.

1 голос
/ 01 февраля 2012

Это не будет хорошей идеей, поскольку весь смысл локального хранилища потока, ну, в общем, локальность потока значения, которое оно содержит - то есть вы можете быть уверены, что ни один другой поток, кроме вашего собственного, не может коснутьсяЗначение.Если другие потоки могут коснуться локального значения вашего потока, это больше не будет "локальным потоком", и это нарушит контракт модели памяти локального хранилища потока.

Либо вам придется использоватьчто-то иное, чем ThreadLocal (например, ConcurrentHashMap) для хранения значения, или вам нужно найти способ запланировать обновление для обсуждаемых потоков.

Вы можете использовать создатель карты Google Guava для создания статическогоfinal ConcurrentWeakReferenceIdentityHashmap со следующим типом: Map<Thread, Map<String, Object>>, где вторая карта - ConcurrentHashMap.Таким образом, вы будете очень близки к ThreadLocal, за исключением того, что вы сможете перебирать карту.

0 голосов
/ 01 февраля 2012

Я разочарован качеством ответов, полученных на этот вопрос;Я нашел свое собственное решение.

Я написал свой тестовый пример сегодня и обнаружил, что единственная проблема с кодом в моем вопросе - Boolean.Boolean не является изменяемым, поэтому мой список ссылок не принес мне пользы.Я посмотрел на этот вопрос и изменил свой код на AtomicBoolean, и теперь все работает как положено.

public class ThreadLocalFlag {

    private ThreadLocal<AtomicBoolean> flag;
    private List<AtomicBoolean> allValues = new ArrayList<AtomicBoolean>();

    public ThreadLocalFlag() {
        flag = new ThreadLocal<AtomicBoolean>() {
            @Override protected AtomicBoolean initialValue() {
                AtomicBoolean value = new AtomicBoolean();
                allValues.add(value);
                return value;
            }
        };
    }

    public boolean get() {
        return flag.get().get();
    }

    public void set(boolean value) {
        flag.get().set(value);
    }

    public void setAll(boolean value) {
        for (AtomicBoolean tlValue : allValues) {
            tlValue.set(value);
        }
    }
}

Контрольный пример:

public class ThreadLocalFlagTest {

    private static ThreadLocalFlag flag = new ThreadLocalFlag();
    private static boolean runThread = true;

    @AfterClass
    public static void tearDownOnce() throws Exception {
        runThread = false;
        flag = null;
    }

    /**
     * @throws Exception if there is any issue with the test
     */
    @Test
    public void testSetAll() throws Exception {
        startThread("ThreadLocalFlagTest-1", false);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            //ignore
        }
        startThread("ThreadLocalFlagTest-2", true);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            //ignore
        }
        startThread("ThreadLocalFlagTest-3", false);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            //ignore
        }
        startThread("ThreadLocalFlagTest-4", true);
        try {
            Thread.sleep(8000L); //watch the alternating values
        } catch (InterruptedException e) {
            //ignore
        }
        flag.setAll(true);
        try {
            Thread.sleep(8000L); //watch the true values
        } catch (InterruptedException e) {
            //ignore
        }
        flag.setAll(false);
        try {
            Thread.sleep(8000L); //watch the false values
        } catch (InterruptedException e) {
            //ignore
        }
    }

    private void startThread(String name, boolean value) {
        Thread t = new Thread(new RunnableCode(value));
        t.setName(name);
        t.start();
    }

    class RunnableCode implements Runnable {

        private boolean initialValue;

        RunnableCode(boolean value) {
            initialValue = value;
        }

        @Override
        public void run() {
            flag.set(initialValue);
            while (runThread) {
                System.out.println(Thread.currentThread().getName() + ": " + flag.get());
                try {
                    Thread.sleep(4000L);
                } catch (InterruptedException e) {
                    //ignore
                }
            }
        }
    }
}
...