Потокобезопасность при переборе параллельных коллекций - PullRequest
1 голос
/ 16 октября 2010

Я пишу какое-нибудь клиент-серверное приложение, где мне приходится иметь дело с несколькими потоками. У меня есть несколько серверов, которые отправляют живые пакеты каждые несколько секунд. Эти серверы поддерживаются в ConcurrentHashMap, который содержит их конечные точки в паре со временем, когда последний живой пакет прибыл на соответствующий сервер.

Теперь у меня есть поток, который должен "перебрать" все серверы, которые не отправляли живые пакеты в течение определенного времени.

Полагаю, я не могу просто так, не так ли?

for( IPEndPoint server : this.fileservers.keySet() )
{
    Long time = this.fileservers.get( server );

    //If server's time is updated here, I got a problem

    if( time > fileserverTimeout )
        this.fileservers.remove( server );
}

Есть ли способ, которым я могу обойти это, не получая блокировки для всего цикла (что я тогда должен уважать и в других потоках)?

Ответы [ 4 ]

3 голосов
/ 17 октября 2010

Вероятно, здесь нет проблем, в зависимости от того, что именно вы храните на карте. Ваш код кажется мне немного странным, поскольку вы, похоже, сохраняете «продолжительность, в течение которой сервер не был активен».

Моя первая идея для записи этих данных состояла в том, чтобы хранить «последнюю временную метку, при которой сервер был активен». Тогда ваш код будет выглядеть так:

package so3950354;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class ServerManager {

  private final ConcurrentMap<Server, Long> lastActive = new ConcurrentHashMap<Server, Long>();

  /** May be overridden by a special method for testing. */
  protected long now() {
    return System.currentTimeMillis();
  }

  public void markActive(Server server) {
    lastActive.put(server, Long.valueOf(now()));
  }

  public void removeInactive(long timeoutMillis) {
    final long now = now();

    Iterator<Map.Entry<Server, Long>> it = lastActive.entrySet().iterator();
    while (it.hasNext()) {
      final Map.Entry<Server, Long> entry = it.next();
      final long backThen = entry.getValue().longValue();
      /*
       * Even if some other code updates the timestamp of this server now,
       * the server had timed out at some point in time, so it may be
       * removed. It's bad luck, but impossible to avoid.
       */
      if (now - backThen >= timeoutMillis) {
        it.remove();
      }
    }
  }

  static class Server {

  }
}

Если вы действительно хотите избежать того, чтобы ни один код не вызывал markActive во время вызова removeInactive, нет никакого способа обойти явную блокировку. Что вы, вероятно, хотите:

  • разрешены одновременные вызовы на markActive.
  • во время markActive звонки на removeInactive запрещены.
  • во время removeInactive звонки на markActive запрещены.

Это похоже на типичный сценарий для ReadWriteLock, где markActive - это операция «чтения», а removeInactive - «операция записи».

1 голос
/ 17 октября 2010

Я не вижу, как другой поток может обновить время сервера в этот момент в вашем коде.После того, как вы извлекли время сервера с карты, используя this.fileservers.get( server ), другой поток не может изменить его значение, поскольку объекты Long являются неизменяемыми.Да, другой поток может поместить новый объект Long для этого сервера в карту, но это не влияет на этот поток, потому что он уже извлек время сервера.

Итак, я могу 'Я не вижу ничего плохого в вашем коде.Итераторы в ConcurrentHashMap слабо согласованы , что означает, что они могут допускать параллельную модификацию, поэтому нет никакого риска возникновения исключения ConcurrentModificationException.

0 голосов
/ 16 октября 2010

(См. ответ Роланда , который берет идеи здесь и конкретизирует их в более полный пример с некоторыми замечательными дополнительными сведениями.)

Поскольку это одновременная хэш-карта, вы можетесделайте следующее.Обратите внимание, что все итераторы CHM реализуют необязательные методы, включая remove(), который вы хотите.См. CHM API docs , в котором говорится:

Этот класс, его представления и итераторы реализуют все необязательные методы интерфейсов Map и Iterator.

Этот код должен работать (я не знаю тип Key в вашем CHM):

ConcurrentHashMap<K,Long> fileservers = ...;

for(Iterator<Map.Entry<K,Long>> fsIter = fileservers.entrySet().iterator(); fileservers.hasNext(); )
{
    Map.Entry<K,Long> thisEntry = fsIter.next();
    Long time = thisEntry.getValue();

    if( time > fileserverTimeout )
        fsIter.remove( server );
}

Но учтите, что в других местах могут быть условия гонки ... Вынеобходимо убедиться, что другие части кода, обращающиеся к карте, могут справиться с этим видом самопроизвольного удаления - т. е., вероятно, всякий раз, когда вы касаетесь fileservers.put(), вам понадобится немного логики, включающей fileservers.putIfAbsent().Это решение с меньшей вероятностью создает узкие места, чем использование synchronized, но оно также требует немного больше размышлений.

Где вы написали: «Если время сервера обновляется здесь, у меня возникла проблема», то именно там putIfAbsent() входит. Если запись отсутствует, либо вы ее раньше не видели, либо вы недавно сбросили ее со стола.Если необходимо согласовать две стороны, вы можете вместо этого ввести блокируемую запись для записи и выполнить синхронизацию на этом уровне (т. Е. Синхронизировать запись при выполнении remove(), а не навесь стол).Тогда конец вещей put() также может синхронизироваться с одной и той же записью, исключая потенциальную расу.

0 голосов
/ 16 октября 2010

Сначала сделайте карту синхронизированной

this.fileservers = Collections.synchronizedMap(Map)

, затем используйте стратегию, которая используется в классах Singleton

if( time > fileserverTimeout )
    {
        synchronized(this.fileservers)
        {
             if( time > fileserverTimeout )
                  this.fileservers.remove( server );
        }
    }

Теперь это гарантирует, что после того, как вы окажетесь внутри синхронизированного блока, обновления не смогутпроисходят.Это происходит потому, что после того, как блокировка на карте установлена, сама карта (синхронизированная оболочка) не будет доступна для обеспечения блокировки потока для обновления, удаления и т. Д.

Двойная проверка времени гарантирует, что синхронизация будет выполненаиспользуется только при наличии подлинного случая удаления

...