Синхронизированный блок Java с использованием вызова метода для получения объекта синхронизации - PullRequest
2 голосов
/ 09 ноября 2010

Мы пишем код блокировки и столкнулись со странным вопросом. Мы используем ConcurrentHashMap для выборки экземпляров Object, на которых мы заблокированы. Таким образом, наши синхронизированные блоки выглядят так:

synchronized(locks.get(key)) { ... }

Мы переопределили метод get ConcurrentHashMap, чтобы он всегда возвращал новый объект, если он не содержал его для ключа.

@Override
public Object get(Object key) {
   Object o = super.get(key);
   if (null == o) {
      Object no = new Object();
      o = putIfAbsent((K) key, no);
      if (null == o) {
         o = no;
      }
   }
   return o;
}

Но есть ли состояние, в котором метод get вернул объект, но поток еще не вошел в синхронизированный блок. Разрешение другим потокам получить тот же объект и заблокировать его.

У нас есть потенциальное состояние гонки:

  • поток 1: получает объект с ключом A, но не входит в синхронизированный блок
  • поток 2: получает объект с ключом A, входит в синхронизированный блок
  • поток 2: удаляет объект с карты, выходит из синхронизированного блока
  • поток 1: входит в синхронизированный блок с объектом, которого больше нет на карте
  • поток 3: получает новый объект для ключа A (не тот объект, который получил поток 1)
  • поток 3: входит в синхронизированный блок, в то время как поток 1 также находится в своем синхронизированном блоке, используя клавишу A

Эта ситуация была бы невозможна, если бы java вошел в синхронизированный блок сразу после возврата вызова для получения. Если нет, есть ли у кого-нибудь информация о том, как мы можем удалять ключи, не беспокоясь об этом состоянии гонки?

Ответы [ 5 ]

3 голосов
/ 09 ноября 2010

На мой взгляд, проблема возникает из-за того, что вы блокируете значения карты , в то время как на самом деле вам нужно заблокировать ключ (или какой-то его производной),Если я правильно понимаю, вы хотите, чтобы 2 потока не запускали критическую секцию, используя один и тот же ключ.

Возможно ли заблокировать ключи?Можете ли вы гарантировать, что вы всегда используете один и тот же экземпляр ключа?

Хорошая альтернатива:

Не удаляйте блокировки вообще.Используйте ReferenceMap со слабыми значениями.Таким образом, запись на карте удаляется, только если она не используется ни одним потоком.

Примечание:

1) Теперь вам придется синхронизировать эту карту (используя Collections.synchronizedMap (..)).

2) Вам также необходимо синхронизировать кодкоторый генерирует / возвращает значение для данного ключа.

1 голос
/ 10 ноября 2010

Спасибо за все замечательные предложения и идеи, очень ценю это!В конце концов, это обсуждение заставило меня придумать решение, которое не использует объекты для блокировки.

Просто краткое описание того, что мы на самом деле делаем.

У нас есть кэш, который получает данные непрерывноиз нашей среды.Кэш имеет несколько «сегментов» для каждого ключа и агрегирует события в сегменты по мере их поступления. Входящие события имеют ключ, который определяет используемую запись кэша, и временную метку, определяющую сегмент в записи кэша, который должен бытьувеличивается.

Кэш также имеет внутреннюю задачу очистки, которая запускается периодически.Он будет перебирать все записи в кэше и сбрасывать все сегменты, кроме текущего, в базу данных.

Теперь временные метки входящих данных могут быть в любое время в прошлом, но большинство из них - для самых последних временных меток.Таким образом, текущий бак будет получать больше попаданий, чем бакетов за предыдущие интервалы времени.

Зная это, я могу продемонстрировать условия гонки, которые у нас были.Весь этот код предназначен для одной записи кэша, поскольку проблема была изолирована от одновременной записи и очистки отдельных элементов кэша.

// buckets :: ConcurrentMap<Long, AtomicLong>

void incrementBucket(long timestamp, long value) {
   long key = bucketKey(timestamp, LOG_BUCKET_INTERVAL);
   AtomicLong bucket = buckets.get(key);
   if (null == bucket) {
      AtomicLong newBucket = new AtomicLong(0);
      bucket = buckets.putIfAbsent(key, newBucket);
      if (null == bucket) {
          bucket = newBucket;
      }
   }

   bucket.addAndGet(value);
}

Map<Long, Long> flush() {
   long now = System.currentTimeMillis();
   long nowKey = bucketKey(now, LOG_BUCKET_INTERVAL);

   Map<Long, Long> flushedValues = new HashMap<Long, Long>();
   for (Long key : new TreeSet<Long>(buckets.keySet())) {
      if (key != nowKey) {
         AtomicLong bucket = buckets.remove(key);
         if (null != bucket) {
            long databaseKey = databaseKey(key);
            long n = bucket.get()
            if (!flushedValues.containsKey(databaseKey)) {
               flushedValues.put(databaseKey, n);
            } else {
               long sum = flushedValues.get(databaseKey) + n;
               flushedValues.put(databaseKey, sum);
            }
         }
      }
   }

   return flushedValues;
}

Что могло произойти: (fl = flush thread, it =инкрементный поток)

  • it: входит в incrementBucket, выполняется до того момента, как вызов addAndGet (значение)
  • fl: входит во флэш и выполняет итерации сегментов
  • fl: достигает увеличиваемого сегмента
  • fl: удаляет его и вызывает bucket.get () и сохраняет значение в сбрасываемых значениях
  • it: увеличивает интервал (который будетпотерян сейчас, потому что ведро было очищено и удалено)

Решение:

void incrementBucket(long timestamp, long value) {
   long key = bucketKey(timestamp, LOG_BUCKET_INTERVAL);

   boolean done = false;
   while (!done) {
      AtomicLong bucket = buckets.get(key);
      if (null == bucket) {
         AtomicLong newBucket = new AtomicLong(0);
         bucket = buckets.putIfAbsent(key, newBucket);
         if (null == bucket) {
             bucket = newBucket;
         }
      }

      synchronized (bucket) {
         // double check if the bucket still is the same
         if (buckets.get(key) != bucket) {
            continue;
         }
         done = true;

         bucket.addAndGet(value);
      }
   }
}

Map<Long, Long> flush() {
   long now = System.currentTimeMillis();
   long nowKey = bucketKey(now, LOG_BUCKET_INTERVAL);

   Map<Long, Long> flushedValues = new HashMap<Long, Long>();
   for (Long key : new TreeSet<Long>(buckets.keySet())) {
      if (key != nowKey) {
         AtomicLong bucket = buckets.get(key);
         if (null != value) {
            synchronized(bucket) {
               buckets.remove(key);
               long databaseKey = databaseKey(key);
               long n = bucket.get()
               if (!flushedValues.containsKey(databaseKey)) {
                  flushedValues.put(databaseKey, n);
               } else {
                  long sum = flushedValues.get(databaseKey) + n;
                  flushedValues.put(databaseKey, sum);
               }
            }
         }
      }
   }

   return flushedValues;
}

Я надеюсь, что это будет полезно для других, которые могут столкнуться с той же проблемой.

1 голос
/ 09 ноября 2010

у вас есть 2 варианта:

а. Вы можете проверить карту один раз внутри синхронизированного блока.

Object o = map.get(k);
synchronized(o) {
  if(map.get(k) != o) {
    // object removed, handle...
  }
}

б. Вы можете расширить свои значения, чтобы содержать флаг, указывающий их статус. когда значение удаляется с карты, вы устанавливаете флаг, указывающий, что оно было удалено (в блоке синхронизации).

CacheValue v = map.get(k);
sychronized(v) {
  if(v.isRemoved()) {
    // object removed, handle...
  }
}
1 голос
/ 09 ноября 2010

Код, как есть, является потокобезопасным. При этом, если вы удаляете из CHM, то любые предположения, которые делаются при синхронизации объекта, возвращенного из коллекции, будут потеряны.

Но существует ли состояние, в котором get-метод возвратил объект, но нить еще не вошла в синхронизированный блок. Позволяя другим потоки, чтобы получить тот же объект и замкни его.

Да, но это происходит каждый раз, когда вы синхронизируете объект. Что гарантированно, так это то, что другой поток не войдет в синхронизированный блок, пока другой не будет существовать.

Если нет, есть ли у кого-нибудь как мы могли бы удалить ключи без приходится беспокоиться об этой гонке состояние

Единственный реальный способ обеспечения этой атомарности - это либо синхронизировать на CHM, либо на другом объекте (совместно используемом всеми потоками). Лучший способ - не удалять из CHM.

0 голосов
/ 09 ноября 2010

Два предоставленных вами фрагмента кода отлично , как они есть.То, что вы сделали, похоже на то, как ленивое создание экземпляров с помощью MapMaker.makeComputingMap () в Guava может работать, но я не вижу проблем с тем, как лениво создаются ключи.

You 'верно то, что поток может быть предварительно выгружен после поиска get() объекта блокировки, но до входа в синхронизированный.

Myпроблема с третьим пунктом в описании вашей расы.Вы говорите:

поток 2: удаляет объект с карты, выходит из синхронизированного блока

Какой объект и какая карта?В общем, я предположил, что вы искали ключ для блокировки, а затем выполняли некоторые другие операции над другими структурами данных в синхронизированном блоке.Если вы говорите об удалении объекта блокировки из ConcurrentHashMap, упомянутого в начале, это огромное различие.

И реальный вопрос в том, нужно ли это вообще.В среде общего назначения я не думаю, что возникнут какие-либо проблемы с памятью, если вспомнить все объекты блокировки для всех ключей, которые когда-либо просматривались (даже если эти ключи больше не представляют живые объекты).На намного сложнее придумать какой-нибудь способ безопасного удаления объекта, который может быть сохранен в локальной переменной какого-либо другого потока в любое время, и если вы действительно хотите пойти по этому пути, у меня естьощущение, что производительность ухудшится по сравнению с одним грубым замком вокруг поиска ключа.

Если я неправильно понял, что там происходит, не стесняйтесь меня поправлять.

Редактировать: ОК - в этом случае я придерживаюсь своего утверждения, что самый простой способ сделать это - не удалить ключи;на самом деле это может быть не так проблематично, как вы думаете, так как скорость роста пространства будет очень мала.По моим расчетам (что может быть не так, я не эксперт в космических расчетах, и ваша JVM может отличаться) карта увеличивается примерно на 14 КБ / час.Вам понадобится год непрерывной работы, прежде чем эта карта израсходует 100 МБ пространства кучи.

Но давайте предположим, что ключи действительно нужно удалить.Это создает проблему, заключающуюся в том, что вы не можете удалить ключ, пока не узнаете, что ни один из потоков не использует его.Это приводит к проблеме "курицы и яйца", когда вам нужно будет синхронизировать все потоки на чем-то иначе , чтобы получить атомарность (проверки) и видимость между потоками, что затем означает, что вы не можетеДелайте гораздо больше, чем шлепайте один синхронизированный блок вокруг всего, полностью подрывая вашу стратегию чередования блокировок.

Давайте вернемся к ограничениям.Главное здесь, чтобы все прояснилось со временем .Это не ограничение правильности, а просто проблема с памятью.Следовательно, что мы действительно хотим сделать, это определить некоторую точку, в которой ключ определенно больше не может быть использован, а затем использовать это как триггер, чтобы удалить его с карты.Здесь есть два случая:

  1. Вы можете идентифицировать такое состояние и логически проверить его.В этом случае вы можете удалить ключи с карты с (в худшем случае) каким-то потоком таймера или, надеюсь, с некоторой логикой, которая более четко интегрирована с вашим приложением.
  2. Вы не можете определить какое-либо условие, с помощью которого вы знать , что ключ больше не будет использоваться.В этом случае, по определению, нет точки, в которой можно безопасно удалить ключи с карты.Так что на самом деле, ради правильности, вы должны оставить их внутри.

В любом случае это фактически сводится к ручному сбору мусора.Удалите ключи с карты, когда вы можете лениво определить, что они больше не будут использоваться.Ваше текущее решение здесь слишком нетерпеливо, поскольку (как вы указали) оно выполняет удаление до того, как эта ситуация сохранится.

...