Требование блокировки чтения и записи Java с блокировкой и освобождением из разных потоков - PullRequest
4 голосов
/ 03 августа 2011

Я пытаюсь найти менее неуклюжее решение проблемы параллелизма Java.

Суть проблемы в том, что мне нужен вызов завершения работы для блокировки, пока рабочие потоки все еще активны, но крайне важноаспект заключается в том, что рабочие задачи создаются и выполняются асинхронно, поэтому удержание и освобождение должны выполняться разными потоками.Мне нужно, чтобы они как-то отправили сигнал в ветку завершения работы после завершения их работы.Просто чтобы сделать вещи более интересными, рабочие потоки не могут блокировать друг друга, поэтому я не уверен насчет применения семафора в этом конкретном случае.

У меня есть решение, которое, я думаю, безопасно выполняет свою работу, но мойнезнакомство с утилитами параллелизма Java заставляет меня думать, что может существовать гораздо более простой или более элегантный шаблон.Любая помощь в этом отношении будет принята с благодарностью.

Вот то, что у меня есть, довольно скудно, за исключением комментариев:

final private ReentrantReadWriteLock shutdownLock = new ReentrantReadWriteLock();
volatile private int activeWorkerThreads;
private boolean isShutdown;

private void workerTask()
{
   try
   {
      // Point A: Worker tasks mustn't block each other.
      shutdownLock.readLock().lock();

      // Point B: I only want worker tasks to continue if the shutdown signal
      // hasn't already been received.
      if (isShutdown)
         return;

      activeWorkerThreads ++;

      // Point C: This async method call returns immediately, soon after which
      // we release our lock. The shutdown thread may then acquire the write lock
      // but we want it to continue blocking until all of the asynchronous tasks
      // have completed.
      executeAsynchronously(new Runnable()
      {
         @Override
         final public void run()
         {
            try
            {
              // Do stuff.
            }
            finally
            {
               // Point D: Release of shutdown thread loop, if there are no other
               // active worker tasks.
               activeWorkerThreads --;
            }
         }
      });
   }
   finally
   {
      shutdownLock.readLock().unlock();
   }
}


final public void shutdown()
{
   try
   {
      // Point E: Shutdown thread must block while any worker threads
      // have breached Point A.
      shutdownLock.writeLock().lock();

      isShutdown = true;

      // Point F: Is there a better way to wait for this signal?
      while (activeWorkerThreads > 0)
         ;

      // Do shutdown operation.
   }
   finally
   {
      shutdownLock.writeLock().unlock();
   }
}

Заранее спасибо за любую помощь!

Russ

Ответы [ 5 ]

3 голосов
/ 03 августа 2011

Объявление activeWorkerThreads как volatile не позволяет вам делать activeWorkerThreads ++, так как ++ это просто сокращение,

activeWorkerThreads = activeWorkerThreads + 1;

Что не атомарно. Вместо этого используйте AtomicInteger.

Отправляет ли executeAsynchronously () задания в ExecutorService? Если это так, вы можете просто использовать метод awaitTermination , так что ваш хук отключения будет,

executor.shutdown();
executor.awaitTermination(1, TimeUnit.Minutes); 
2 голосов
/ 04 августа 2011

Вы можете использовать семафор в этом сценарии и не требовать занятого ожидания вызова shutdown ().Это можно представить как набор билетов, которые раздают работникам, чтобы указать, что они находятся в полете.Если метод shutdown () может получить все билеты, то он знает, что он опустошил всех рабочих и не выполняет никаких действий.Поскольку #acquire () является блокирующим вызовом, shutdown () не будет вращаться.Я использовал этот подход для распределенной библиотеки master-worker и легко расширяю ее для обработки тайм-аутов и повторов.

Executor executor = // ...
final int permits = // ...
final Semaphore semaphore = new Semaphore(permits);

void schedule(final Runnable task) {
  semaphore.acquire();
  try {
    executor.execute(new Runnable() {
      @Override public run() {
        try {
          task.run();
        } finally {
          semaphore.release();
        }
      }
    });
  } catch (RejectedExecutionException e) {
    semaphore.release();
    throw e;
  }
}

void shutDown() {
  semaphore.acquireUninterruptibly(permits);

  // do stuff
}
2 голосов
/ 03 августа 2011

Вау, Нелли. Никогда не делай этого:

  // Point F: Is there a better way to wait for this signal?
  while (activeWorkerThreads > 0)
     ;

Вы вращаете и потребляете процессор. Используйте правильное уведомление:

Сначала: synchronize для объекта, , затем , проверьте activeWorkerThreads и wait() для объекта, если он все еще> 0:

synchronized (mutexObject) {
    while (activeWorkerThreads > 0) {
        mutexObject.wait();
    }
}

Второе: попросите рабочих уведомить () объект после того, как они уменьшат счет activeWorkerThreads. Вы должны синхронизироваться на объекте перед вызовом notify.

synchronized (mutexObject) {
    activeWorkerThreads--;
    mutexObject.notify();
}

В-третьих: видя, как вы (после реализации 1 и 2) синхронизируете объект, всякий раз, когда вы касаетесь activeWorkerThreads, используйте его в качестве защиты; нет необходимости, чтобы переменная была изменчивой.

Тогда: тот же объект, который вы используете в качестве мьютекса для управления доступом к activeWorkerThreads, также может использоваться для управления доступом к isShutdown. Пример:

synchronized (mutexObject) {
    if (isShutdown) {
        return;
    }
}

Это не приведет к тому, что работники будут блокировать друг друга, за исключением неизмеримо небольшого количества времени (которого вы, скорее всего, не избежите, используя блокировку чтения-записи).

2 голосов
/ 03 августа 2011

ExecutorService должно быть предпочтительным решением в качестве упомянутых мостиков.

В качестве альтернативы, если число рабочих потоков фиксировано, тогда вы можете использовать CountDownLatch :

final CountDownLatch latch = new CountDownLatch(numberOfWorkers);

Передайте latch каждому рабочему потоку и вызовите latch.countDown(), когда задача будет выполнена.

Вызовите latch.await() из основного потока, чтобы дождаться завершения всех задач.

0 голосов
/ 03 августа 2011

Это больше похоже на комментарий к ответу sbridges, но это было слишком долго для отправки в качестве комментария.

В любом случае, только 1 комментарий.

Когда вы выключаете исполнителя, отправка нового задания исполнителю приведет к отмене RejectedExecutionException, если вы используете реализации по умолчанию (как Executors.newSingleThreadExecutor()).Поэтому в вашем случае вы, вероятно, захотите использовать следующий код:

code:

new ThreadPoolExecutor(1,
                       1,
                       1,
                       TimeUnit.HOURS,
                       new LinkedBlockingQueue<Runnable>(),
                       new ThreadPoolExecutor.DiscardPolicy());

Таким образом, задачи, которые были отправлены исполнителю после вызова shutdown(), простоигнорируются.Приведенный выше параметр (1,1 ... и т. Д.) Должен создать исполнителя, который в основном является однопоточным, но не вызывает исключение времени выполнения.

...