Какие классы параллелизма Java могут мне помочь? - PullRequest
3 голосов
/ 19 марта 2010

Я пытаюсь написать код, который имеет следующее поведение:

  • Существует множество одновременных и случайных вызовов X различными потоками (Тема X)
  • В какой-то момент в будущем один вызов будет выполнен одним потоком (поток Y)
  • До тех пор, пока не будет вызван Y, X должно быть разрешено проходить без вызова, при этом одновременные вызовы X совершенно действительны
  • После вызова Y любые существующие вызовы X должны быть завершены, но новые вызовы X должны быть отклонены каким-либо образом (RuntimeException и т. Д.)
  • Y не должен продолжать выполнение до тех пор, пока все существующие вызовы X не будут завершены
  • Обновление : Когда вызывается Y, он должен посылать сигнал объектам, работающим в потоке X, который говорит им прервать () некоторым изящным способом (в идеале заканчивая очень быстро, чтобы Y мог продолжить)

Я смотрел на использование Semaphore, CountDownLatch и даже писал свой собственный AbstractQueuedSynchronizer, но ни один из них не соответствовал вышеуказанным требованиям. Например, CountDownLatch предполагает, что вы знаете, сколько будет сделано вызовов X, чего мы не знаем.

Кажется, я почти хочу смесь CountUpDownLatch и, может быть, какой-то простой AtomicBoolean, но это то, что я сейчас использую, и время от времени я зацикливаюсь на себе. С другой стороны, я использую схематично выглядящую реализацию CountUpDownLatch, которая является частью HSQLDB, которая серьезно не выглядит поточно-безопасной.

Есть идеи о том, как мне следует подойти к этой проблеме?

Ответы [ 4 ]

3 голосов
/ 19 марта 2010

Похоже на задание для ReentrantReadWriteLock. Примерно так:

class A {
    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    public void x() {
        if (!lock.readLock().tryLock()) throw new RuntimeException();
        try {
            ...
        } finally {
            lock.readLock().unlock();
        }
    }

    public void y() {
        lock.writeLock().lock();
        try {
            ...
        } finally {
            lock.writeLock().unlock();
        }
    }
}
2 голосов
/ 19 марта 2010

Java 7 выходит с новым классом конструкции параллелизма, java.util.concurrent.Phaser. Фазер - это прославленный цилиндрический барьер, который позволяет ожидать определенных фаз (или итераций) параллельного выполнения. Вы можете скачать последнюю бинарную флягу на сайте по интересам JSR 166 .

Я подумал, что у вас есть изменяемый логический флаг yHasEnetered, который по умолчанию устанавливает значение false. Таким образом, в вашем исполнении X вы можете сделать:

        if(yHasEnetered)
            throw new IllegalStateException();
        phaser.register();
        //do work here
        phasre.arrive();

Фазер сам будет вести подсчет всех зарегистрированных партий, поэтому, когда Y-нить вошла, он может установить соответствующий флаг, зарегистрировать себя и дождаться продвижения.

        yHasEntered=true;
        int phase = phaser.register();
        phaser.arriveAndAwaitAdvance(phase);
        //do y work here
        yHasEntered=false;

На данный момент для Y потока. Он зарегистрирует себя, получит фазу, на которой в данный момент находится фазер, приедет и будет ждать, пока все остальные выполняющиеся потоки достигнут своего соответствующего блока поступления.

0 голосов
/ 19 марта 2010

Хорошо, как насчет использования ActiveObject?

class X {
    private static final ExecutorService service = Executors.newCachedThreadPool();


    public void x(){
       //Using anonymous class for brevity. Normal Runnable is ok. May be static inner class.  
       service.submit(new Runnable(){
                             @Override
                             public void run(){
                                   //do stuff
                             }
                       });
    }

    //Derived from Java6 ExecutorService API        
    public void shutDownAndAwait(){
        service.shutdownNow();
        try{
           if (!service.awaitTermination(60, TimeUnit.SECONDS))
              throw new TerminationException("Service did not terminate");
           }
        } catch (InterruptedException ie) {
           // (Re-)Cancel if current thread also interrupted
           service.shutdownNow();
           // Preserve interrupt status
           Thread.currentThread().interrupt();
        }
   }//shutdownAndAwait

}//X


class Y {

    private final X x = //inject reference

    public void y(){
        x.shutdownAndAwait();
    }
}

Проблема в том, что теперь выполнение x () является асинхронным, поэтому исходный поток, вызвавший x (), не уверен, когда он был завершен. И они не прерываются при вызове y () ...

0 голосов
/ 19 марта 2010

Рассмотрите возможность использования реализации ExecutorService . Добавьте задачи, используя методы submit (), а затем скажите службе не принимать больше задач с помощью метода shutdown () - задачи, которые уже были приняты, будут нормально выполняться.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...