Как я могу ждать () для одного объекта, а затем уведомить все о другом? - PullRequest
3 голосов
/ 29 ноября 2010

Я считаю, что проблема, с которой я сталкиваюсь, - это вариант блокировки вложенного монитора.В основном у меня есть две группы потоков (не ThreadGroups, просто логические группы).Одна группа потоков (скажем, фоновая группа) будет ожидать объекта, пока другая группа потоков работает (рабочая группа).Один за другим рабочие потоки завершаются, пока, наконец, последний рабочий поток не окажется в методе complete.То, что я хочу сделать, - это выяснить, какой метод заставляет этот последний рабочий поток ждать, и затем вызывает notifyAll () для пробуждения всех фоновых потоков.Как вы, вероятно, можете догадаться, две группы потоков переключаются взад-вперед - одна группа работает, а другая ждет, а затем группы переключаются.Проблема в том, что если я уведомляю All () о текущих ожидающих потоках, то нет никакой гарантии, что последний рабочий поток перейдет к вызову wait () до завершения уведомленных потоков и попытается запустить следующий обмен.

Извините, если этот вопрос немного устарел - кажется, чем больше я работаю над параллелизмом, тем более запутанным становится мой код: (

Ответы [ 4 ]

2 голосов
/ 29 ноября 2010

Звучит так, будто вам нужно что-то вроде класса Gate, состоящего из двух экземпляров CountDownLatch.Я использую нечто подобное во многих многопоточных тестах.

Все ожидающие потоки вызывают gate.ready(), а рабочие вызывают gate.go() после завершения

Обратите внимание, что в этой конкретной реализации предполагается 1 поток координатора.Чтобы поддержать больше, просто создайте защелку go с количеством нужных вам потоков официантов.

/**
 * Simple starting gate for co-ordinating a bunch of threads.
 */
final class Gate {
  final CountDownLatch ready;
  final CountDownLatch go = new CountDownLatch(1);

  Gate(final int threads) {
    ready = new CountDownLatch(threads);
  }

  /**
   * Called from the racing threads when ready. They will then block until all
   * threads are at this point;
   */
  void ready() {
    ready.countDown();
    await(go);
  }

  /**
   * Called from the starter thread. Blocks until everybody is ready, and then
   * signals go.
   */
  void go() {
    await(ready);
    go.countDown();
  }

  static void await(final CountDownLatch latch) {
    try {
      if (!latch.await(5, TimeUnit.SECONDS)) { // arbitrary, parameterise for production use
        throw new TimedOutException()
      }
    } catch (final InterruptedException e) {
      throw new RuntimeException(e);
    }
  }

  static final class TimedOutException extends IllegalStateException {}
}

Если вам нужно неизвестное число произвольных потоков, вам, вероятно, нужно что-то похожее на класс Phaser Дуга Ли вJava7.

1 голос
/ 29 ноября 2010

Вы можете попробовать подключить группы потоков к Exchanger . Классически он используется для передачи работы между двумя потоками, которые чередуют работу. Похоже, что вы могли бы заставить его работать и для групп потоков, если вы можете заставить передачу работать правильно.

Что если у вас есть поток контроллеров для каждой группы? Затем вы можете иметь контроллер, уведомляющий всех о своей группе, когда он получил элемент в обменнике, а затем присоединиться ко всей своей собственной группе. Когда все соединения вернутся, он сможет вернуть контроль над обменником.

Или, если число потоков в каждой группе фиксированное, вы можете создать CyclicBarrier для группы с фиксированным числом потоков, а затем указать действие барьера, которое будет выполняться после завершения всех потоков и поразить барьер. Это действие может передавать управление через Exchanger или SynchronousQueue (который является очередью 0 длины, которая обеспечивает синхронную координацию).

Для получения дополнительной информации о синхронизаторах, ознакомьтесь с Параллелизмом Java на практике или Рефкарта параллелизма DZone .

0 голосов
/ 29 ноября 2010

Можно ли разрешить потокам возвращаться и завершаться вместо того, чтобы заставлять их ждать?Если это так, рассматривали ли вы реализацию диспетчера потоков для порождения потоков и инициирования управления для каждой группы?

многопоточный процесс:

public void run()
{
    while (workRemaining())
    {
        doWork();
    }
    this.manager.workCompleted();
}

и в диспетчере потоков:

void workCompleted()
{
    if (--this.runningThreads <= 0)
    {
        spawnNewGroup();
    }
}

void spawnNewGroup()
{
    for (int i=0; i<groupSize; i++)
    {
        startIndividualThread();
        this.runningThreads++;
    }
}
0 голосов
/ 29 ноября 2010

Возможно, вы могли бы использовать некоторую переменную, чтобы указать количество все еще работающих потоков. Итак, когда поток завершается, он использует этот метод:

synchronized void completed() {
    threads_working--;
    if (threads_working == 0) {
        synchronized (some_lock) {
            some_lock.notifyAll();
        }
    }
}

И каждый поток должен увеличивать это число, когда он начинает работать.

...