Выбор потока для выполнения барьерного действия - Java CyclicBarrier - PullRequest
6 голосов
/ 24 декабря 2011

Глядя на javadocs для CyclicBarrier, я нашел следующее утверждение в документации класса, которое я не совсем понимаю. Из Javadoc :

Если барьерное действие не зависит от того, какие стороны были приостановлены при его выполнении, тогда любой из потоков в стороне может выполнить это действие после его отмены. Чтобы облегчить это, каждый вызов await () возвращает индекс прибытия этого потока в барьер. Затем вы можете выбрать, какой поток должен выполнить барьерное действие, например:

if (barrier.await() == 0) {
  // log the completion of this iteration
} 

Может кто-нибудь объяснить, как назначить конкретный поток для выполнения барьерного действия после того, как все стороны вызвали .await (), и, возможно, привести пример?

Ответы [ 3 ]

3 голосов
/ 24 декабря 2011

ОК, представьте, что RuPaul хотел несколько рабочих потоков, но только 3-й, который закончил, должен выполнить барьерную задачу (скажем «Sashay, Chante»).

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class Main
{

   private static class Worker implements Runnable {

      private CyclicBarrier barrier;

      public Worker(CyclicBarrier b) {
         barrier = b;
      }

      public void run() {
         final String threadName = Thread.currentThread().getName();

         System.out.printf("%s:  You better work!%n", threadName);
         // simulate the workin' it part
         Random rnd = new Random();
         int secondsToWorkIt = rnd.nextInt(10) + 1;

         try {
            TimeUnit.SECONDS.sleep(secondsToWorkIt);
         } catch (InterruptedException ex) { /* ...*/ }

         System.out.printf("%s worked it, girl!%n", threadName);

         try {
            int n = barrier.await();
            final int myOrder = barrier.getParties() - n;
            System.out.printf("Turn number: %s was %s%n", myOrder, threadName);

            // MAGIC CODE HERE!!!
            if (myOrder == 3) { // the third one that finished
               System.out.printf("%s: Sashay Chante!%n", myOrder);
            }
            // END MAGIC CODE
         }
         catch (BrokenBarrierException ex) { /* ... */ }
         catch (InterruptedException ex) { /* ... */ }
      }
   }

   private final int numThreads = 5;

   public void work() {
      /*
       * I want the 3rd thread that finished to say "Sashay Chante!"
       * when everyone has called await.
       * So I'm not going to put my "barrier action" in the CyclicBarrier constructor,
       * where only the last thread will run it! I'm going to put it in the Runnable
       * that calls await.
       */
      CyclicBarrier b = new CyclicBarrier(numThreads);

      for (int i= 0; i < numThreads; i++) {
         Worker task = new Worker(b);
         Thread thread = new Thread(task);
         thread.start();
      }
   }

   public static void main(String[] args)
   {
      Main main = new Main();
      main.work();
   }

}

Вот примервыходные данные:

Thread-0:  You better work!
Thread-4:  You better work!
Thread-2:  You better work!
Thread-1:  You better work!
Thread-3:  You better work!
Thread-1 worked it, girl!
Thread-4 worked it, girl!
Thread-0 worked it, girl!
Thread-3 worked it, girl!
Thread-2 worked it, girl!
Turn number: 5 was Thread-2
Turn number: 3 was Thread-0
3: Sashay Chante!
Turn number: 1 was Thread-1
Turn number: 4 was Thread-3
Turn number: 2 was Thread-4

Как вы можете видеть, нить, которая заняла 3-е место, была нитью-0, поэтому нить-0 была той, которая выполнила "барьерное действие".

Скажите, что выв состоянии назвать ваши темы:

thread.setName("My Thread " + i);

Затем вы можете выполнить действие с темой этого имени ... Я не знаю, насколько это возможно для вас.

2 голосов
/ 24 декабря 2011

Я думаю, что раздел документации о альтернативе барьерному действию Runnable, а не об особом способе его использования.Обратите внимание на то, как написано (выделено мной):

Если барьерное действие не зависит от приостановления действий сторон при его выполнении

Есливы указываете действие барьера как выполняемый, затем оно ...

запускается один раз для каждой точки барьера после прибытия последнего потока в партии, но до того, как какие-либо потоки будут освобождены

Итак, пока потоки приостановлены (хотя он запускается последним прибывающим потоком, этот поток не приостанавливается; но по крайней мере его нормальный поток выполнения приостанавливается до завершения действия барьера).

Бизнес с использованием возвращаемого значения await() - это то, что вы можете сделать, если вам не нужно, чтобы ваше действие выполнялось, пока потоки приостановлены.

Примеры документации являются ориентировочными.Пример использования барьерного действия Runnable координирует работу некоторых других потоков - объединяет строки и проверяет, выполнена ли работа.Другие потоки должны подождать, пока они узнают, есть ли у них больше работы.Таким образом, он должен работать, пока они приостановлены.Пример использования возвращаемого значения из await() - это регистрация.Другие потоки не зависят от ведения журнала.Таким образом, это может произойти, когда другие потоки начали выполнять больше работы.

1 голос
/ 24 декабря 2011

CyclicBarrier позволяет назначать поток с помощью ORDER:

Назначение потока, который возвращается в заданном порядке, возможно, если, как вы говорите, вы заключаете логику завершения барьера в условное условие, котороеявляется специфичным для индекса потока.Таким образом, ваша реализация выше будет работать в соответствии с документацией, которую вы цитировали.

Однако здесь возникает путаница - в документации говорится об идентичности потока с точки зрения порядка возврата к барьеру, а не идентичности объекта потока.Таким образом, поток 0 ссылается на 0-й поток для завершения.

Альтернатива: назначение потока с использованием других механизмов.

Если вы хотите, чтобы определенный поток выполнял определенное действие после завершения других работ, вы можете использовать другоймеханизм - как семафор, например.Если вам нужно такое поведение, вам может не понадобиться циклический барьер.

Чтобы проверить, что подразумевается под документацией, запустите класс (изменен с http://programmingexamples.wikidot.com/cyclicbarrier) ниже, где я включил ваш фрагмент.

Пример того, что подразумевается поддокументы для потока пакета CyclicBarrier

; импорт java.util.concurrent.BrokenBarrierException; импорт java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample
{
    private static int matrix[][] = 
    { 
        { 1 }, 
        { 2, 2 }, 
        { 3, 3, 3 },
        { 4, 4, 4, 4 }, 
        { 5, 5, 5, 5, 5 } };

    static final int rows = matrix.length;
    private static int results[]=new int[rows];


    static int threadId=0;
    private static class Summer extends Thread
    {
        int row;

        CyclicBarrier barrier;

        Summer(CyclicBarrier barrier, int row)
        {
            this.barrier = barrier;
            this.row = row;
        }

        public void run()
        {
            int columns = matrix[row].length;
            int sum = 0;
            for (int i = 0; i < columns; i++)
            {
                sum += matrix[row][i];
            }
            results[row] = sum;
            System.out.println("Results for row " + row + " are : " + sum);
            // wait for the others 
            // Try commenting the below block, and watch what happens. 
            try
            {
                int w = barrier.await();
                if(w==0)
                {
                    System.out.println("merging now !");
                    int fullSum = 0;
                    for (int i = 0; i < rows; i++)
                    {

                        fullSum += results[i];
                    }
                    System.out.println("Results are: " + fullSum);
                }
            }
            catch(Exception e)
            {
                e.printStackTrace();
            }
        }
    }
    public static void main(String args[])
    {
        /*
         * public CyclicBarrier(int parties,Runnable barrierAction)
         * Creates a new CyclicBarrier that will trip when the given number
         * of parties (threads) are waiting upon it, and which will execute 
         * the merger task when the barrier is tripped, performed 
         * by the last thread entering the barrier.
         */
        CyclicBarrier barrier = new CyclicBarrier(rows );
        for (int i = 0; i < rows; i++)
        {
            System.out.println("Creating summer " + i);
            new Summer(barrier, i).start();

        }
        System.out.println("Waiting...");
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...