Java: учебные пособия / объяснения jsr166y Phaser - PullRequest
15 голосов
/ 26 июля 2011

Этот вопрос задавался два года назад, но упоминаемые в нем ресурсы либо не очень полезны (ИМХО), либо ссылки больше не действительны.

Должны быть некоторые хорошие учебники изтам, чтобы понять Phaser.Я прочитал Javadoc, но мои глаза остекленели, так как для того, чтобы действительно понять Javadoc, вы должны знать, как эти классы должны использоваться.

У кого-нибудь есть какие-либо предложения?

Ответы [ 3 ]

45 голосов
/ 26 июля 2011

Для Phaser я ответил на несколько вопросов.Увидеть их может помочь в понимании их приложений.Они связаны внизу.Но чтобы понять, что делает Phaser и почему его полезно, важно знать, что он решает.

Вот атрибуты CountdownLatch и CyclicBarrier

Примечание:

  • Количество партий - это еще один способ сказать # разных потоков
  • НеПовторное использование означает, что вам придется создать новый экземпляр барьера перед повторным использованием
  • Барьер можно продвигать, если поток может прибыть и продолжить работу, не ожидая других или может дождаться завершения всех потоков

Обратный отсчет

  • Фиксированное число сторон
  • Не удалось повторить
  • Усовершенствовано (посмотрите на latch.countDown(); продвигаемый latch.await(); должен ждать )

CyclicBarrier

  • Фиксированное количество партий
  • Многоразовое использование
  • Невозможно продвинуться

Таким образом, CountdownLatch нельзя использовать повторно, вы должны каждый раз создавать новый экземпляр, но он доступен.CylicBarrier можно использовать повторно, но все потоки должны ждать, пока каждая сторона достигнет барьера.

Phaser

  • Динамическое числостороны
  • Многоразовые
  • Продвигаемые

Когда поток хочет быть известным Фазеру, он вызывает phaser.register(), когда поток достигает барьера, который они вызывают phaser.arrive() и вот где это продвигается .Если поток хочет дождаться завершения всех зарегистрированных задач phaser.arriveAndAwaitAdvance()

Существует также концепция фазы, в которой потоки могут ожидать завершения других операций, которые, возможно, не завершены.Как только все потоки достигают барьера фазера, создается новая фаза (с шагом 1).

Вы можете взглянуть на другие мои ответы, может быть, это поможет:

Java ExecutorService: awaitTermination всех рекурсивно созданных задач

Гибкая CountDownLatch?

5 голосов
/ 26 июля 2011

Для Phaser, по крайней мере, я думаю, что JavaDoc предлагает довольно четкое объяснение. Это класс, который вы будете использовать для синхронизации пакета потоков, в том смысле, что вы можете зарегистрировать каждый поток в пакете с помощью Phaser, а затем использовать Phaser, чтобы блокировать их, пока каждый поток в пакете не получит уведомил Phaser, после чего любой заблокированный поток (ы) начнет выполняться. Этот цикл ожидания / уведомления может повторяться снова и снова, по желанию / необходимости.

Их пример кода дает разумный пример (хотя мне очень не нравится их стиль двухсимвольного отступа):

void runTasks(List<Runnable> tasks) {
   final Phaser phaser = new Phaser(1); // "1" to register self
   // create and start threads
   for (final Runnable task : tasks) {
     phaser.register();
     new Thread() {
       public void run() {
         phaser.arriveAndAwaitAdvance(); // await all creation
         task.run();
       }
     }.start();
   }

   // allow threads to start and deregister self
   phaser.arriveAndDeregister();
 } 

Это устанавливает Phaser с количеством регистраций tasks.size() + 1, и для каждой задачи создается новый Thread, который будет блокироваться до следующего продвижения Phaser (то есть время, в которое tasks.size() + 1 прибытия были записаны), а затем запустить соответствующую задачу. Каждый созданный Thread также запускается мгновенно, поэтому Phaser выходит из цикла с записью tasks.size() поступлений.

Последний вызов phaser.arriveAndDeregister() запишет окончательное прибытие, а также уменьшит количество регистраций, чтобы оно теперь равнялось tasks.size(). Это заставляет Phaser двигаться вперед, что фактически позволяет запускать все задачи одновременно. Это можно повторить, выполнив что-то вроде:

void runTasks(List<Runnable> tasks) {
   final Phaser phaser = new Phaser(1); // "1" to register self
   // create and start threads
   for (final Runnable task : tasks) {
     phaser.register();
     new Thread() {
       public void run() {
         while (true) {
           phaser.arriveAndAwaitAdvance(); // await all creation
           task.run();
         }
       }
     }.start();
   }

   // allow threads to start and deregister self
   phaser.arriveAndDeregister();
 }

... это то же самое, что и раньше, за исключением добавления цикла, который вызывает многократное выполнение задачи. Поскольку каждая итерация вызывает phaser.arriveAndAwaitAdvance(), выполнение потоков задачи будет синхронизировано таким образом, что задача-0 не начнет свою вторую итерацию, пока каждая другая задача не завершит свою первую итерацию и не уведомит Phaser, который готов начать свою вторую итерация.

Это может быть полезно, если выполняемые вами задачи сильно различаются по времени, которое они затрачивают на выполнение, и если вы хотите убедиться, что более быстрые потоки не синхронизируются с более медленными.

В качестве возможного реального приложения можно рассмотреть игру, в которой используются отдельные графические и физические потоки. Вы не хотите, чтобы физический поток вычислял данные для кадра 100, если графический поток застрял на кадре 6, и использование Phaser является одним из возможных подходов к обеспечению того, чтобы графические и физические потоки всегда работали на одном кадре. в то же время (а также, что, если один поток значительно медленнее другого, более быстрый поток изящно отдает ресурсы ЦП, так что, надеюсь, более медленный поток сможет быстрее его догнать).

2 голосов
/ 08 мая 2015

Phaser несколько похож на функциональность CyclicBarrier и CountDownLatch, но он обеспечивает большую гибкость, чем оба.

В CyclicBarrier мы привыкли регистрировать сторон в конструкторено Phaser предоставляет нам гибкость регистрации и отмены регистрации сторон в любое время.Для регистрации сторон мы можем использовать любой из следующих конструкторов -

  • * регистр 1010 *
  • или
  • bulkRegister

For отменяя регистрацию сторон, мы можем использовать -

  • receiveAndDeregister или


register- Добавляет / Регистрирует новую непроверенную сторонук этому фазеру.Возвращает

  • номер фазы прибытия, к которой применяется эта регистрация.
  • Если Phaser завершился, значение отрицательное и регистрация не имеет эффекта.

Если происходит вызов метода onAdvance (), то перед возвратом этот метод может ожидать его завершения.Если у этого фазера есть родитель, и с этим фазером не было зарегистрированных сторон, этот дочерний фазер также регистрируется у его родителя. Программа для демонстрации использования Phaser>

import java.util.concurrent.Phaser;

public class PhaserTest {
public static void main(String[] args) {

       /*Creates a new phaser with 1 registered unArrived parties 
        * and initial phase number is 0
        */
       Phaser phaser=new Phaser(1);
       System.out.println("new phaser with 1 registered unArrived parties"
                    + " created and initial phase  number is 0 ");

       //Create 3 threads
       Thread thread1=new Thread(new MyRunnable(phaser,"first"),"Thread-1");
       Thread thread2=new Thread(new MyRunnable(phaser,"second"),"Thread-2");
       Thread thread3=new Thread(new MyRunnable(phaser,"third"),"Thread-3");

       System.out.println("\n--------Phaser has started---------------");
       //Start 3 threads
       thread1.start();
       thread2.start();
       thread3.start();

       //get current phase
       int currentPhase=phaser.getPhase();
       /*arriveAndAwaitAdvance() will cause thread to wait until current phase
        * has been completed i.e. until all registered threads
        * call arriveAndAwaitAdvance()
        */
       phaser.arriveAndAwaitAdvance();
       System.out.println("------Phase-"+currentPhase+" has been COMPLETED----------");

       //------NEXT PHASE BEGINS------

       currentPhase=phaser.getPhase();
       phaser.arriveAndAwaitAdvance();
       System.out.println("------Phase-"+currentPhase+" has been COMPLETED----------");

       /* current thread Arrives and deRegisters from phaser.
        * DeRegistration reduces the number of parties that may
        * be required to advance in future phases.
        */
       phaser.arriveAndDeregister();

       //check whether phaser has been terminated or not.
       if(phaser.isTerminated())
              System.out.println("\nPhaser has been terminated");

} 
}





class MyRunnable implements Runnable{

Phaser phaser;

MyRunnable(Phaser phaser,String name){
       this.phaser=phaser;
       this.phaser.register(); //Registers/Add a new unArrived party to this phaser.
       System.out.println(name +" - New unarrived party has "
                    + "been registered with phaser");
}

@Override
public void run() {
       System.out.println(Thread.currentThread().getName() +
                    " - party has arrived and is working in "
                    + "Phase-"+phaser.getPhase());
       phaser.arriveAndAwaitAdvance();

       //Sleep has been used for formatting output
       try {
              Thread.sleep(1000);
       } catch (InterruptedException e) {
              e.printStackTrace();
       }

       //------NEXT PHASE BEGINS------

       System.out.println(Thread.currentThread().getName() +
                    " - party has arrived and is working in "
                    + "Phase-"+phaser.getPhase());
       phaser.arriveAndAwaitAdvance();  

       phaser.arriveAndDeregister();
}

}


bulkRegister - Добавляет количество сторон для новых непарризованных сторон в этот фазер,Возвращает

  • номер фазы прибытия, к которой применяется данная регистрация.
  • Если фазер завершен, значение отрицательное и регистрация не действует.

Если происходит вызов метода onAdvance (), то перед возвратом этот метод может ожидать его завершения.

receiveAndDeregister- Текущий поток (Сторона) Приходит и снимает регистрацию с фазера.Отмена регистрации уменьшает число сторон, которые могут потребоваться в будущем для перехода на следующую фазу.

Если у этого фазера есть родительский элемент , и не было зарегистрированных партий с этим фазером, этот дочерний фазертакже зарегистрирован со своим родителем. Программа для демонстрации Phaser для родителей и детей

import java.util.concurrent.Phaser;

public class PhaserParentChildTest {
public static void main(String[] args) {


    /*
  * Creates a new phaser with no registered unArrived parties.
  */
    Phaser parentPhaser = new Phaser();

    /*
  * Creates a new phaser with the given parent &
  * no registered unArrived parties.
  */
    Phaser childPhaser = new Phaser(parentPhaser,0);

    childPhaser.register();

    System.out.println("parentPhaser isTerminated : "+parentPhaser.isTerminated());
    System.out.println("childPhaser isTerminated : "+childPhaser.isTerminated());

    childPhaser.arriveAndDeregister();
    System.out.println("\n--childPhaser has called arriveAndDeregister()-- \n");

    System.out.println("parentPhaser isTerminated : "+parentPhaser.isTerminated());
    System.out.println("childPhaser isTerminated : "+childPhaser.isTerminated());

}
}

Подробнее о Phaser на http://www.javamadesoeasy.com/2015/03/phaser-in-java_21.html

...