Java ExecutorService: awaitTermination всех рекурсивно созданных задач - PullRequest
32 голосов
/ 10 февраля 2011

Я использую ExecutorService для выполнения задачи. Эта задача может рекурсивно создавать другие задачи, отправленные на тот же ExecutorService, и эти дочерние задачи тоже могут это делать.

Теперь у меня проблема с тем, что я хочу подождать, пока все задачи будут выполнены (то есть все задачи завершены, и они не отправили новые), прежде чем я продолжу.

Я не могу вызвать ExecutorService.shutdown() в главном потоке, потому что это препятствует принятию новых задач ExecutorService.

И Вызов ExecutorService.awaitTermination(), кажется, ничего не делает, если shutdown не был вызван.

Так что я как бы застрял здесь. 1015 * не может быть так сложно увидеть, что все рабочие бездействуют, не так ли? Единственное неуместное решение, которое я мог придумать, - это напрямую использовать ThreadPoolExecutor и периодически запрашивать его getPoolSize(). Неужели нет лучшего способа сделать это?

Ответы [ 11 ]

17 голосов
/ 10 февраля 2011

Это действительно идеальный кандидат для фазера. Java 7 выходит с этим новым классом. Это гибкий CountdonwLatch / CyclicBarrier. Вы можете получить стабильную версию по адресу JSR 166 Interest Site .

Более гибкая функция CountdownLatch / CyclicBarrier заключается в том, что она способна не только поддерживать неизвестное количество сторон (потоков), но и многократно использоваться (там, где входит фазовая часть)

Для каждого отправляемого вами задания вы регистрируетесь, когда вы его завершаете. Это можно сделать рекурсивно.

Phaser phaser = new Phaser();
ExecutorService e = //

Runnable recursiveRunnable = new Runnable(){
   public void run(){
      //do work recursively if you have to

      if(shouldBeRecursive){
           phaser.register();
           e.submit(recursiveRunnable);
      }

      phaser.arrive();
   }
}

public void doWork(){
   int phase = phaser.getPhase();

   phaser.register();
   e.submit(recursiveRunnable);

   phaser.awaitAdvance(phase);
}

Редактировать: Спасибо @depthofreality за указание условия гонки в моем предыдущем примере. Я обновляю его так, чтобы выполняющийся поток только ожидал продвижения текущей фазы, поскольку он блокирует выполнение рекурсивной функции.

Номер фазы не сработает, пока не наберется число arrive s == register s. Поскольку перед каждым вызовом рекурсивного вызова register при завершении всех вызовов будет происходить приращение фазы.

17 голосов
/ 10 февраля 2011

Если число задач в дереве рекурсивных задач изначально неизвестно, возможно, самый простой способ - реализовать собственный примитив синхронизации, своего рода «обратный семафор», и разделить его среди своих задач.Перед отправкой каждой задачи вы увеличиваете значение, когда задача завершается, оно уменьшает это значение, и вы ждете, пока значение не станет равным 0.

Реализация его как отдельного примитива, явно вызванного из задач, отделяет эту логику от потокареализация пула и позволяет вам отправлять несколько независимых деревьев рекурсивных задач в один и тот же пул.

Примерно так:

public class InverseSemaphore {
    private int value = 0;
    private Object lock = new Object();

    public void beforeSubmit() {
        synchronized(lock) {
            value++;
        }
    }

    public void taskCompleted() {
        synchronized(lock) {
            value--;
            if (value == 0) lock.notifyAll();
        }
    }

    public void awaitCompletion() throws InterruptedException {
        synchronized(lock) {
            while (value > 0) lock.wait();
        }
    }
}

Обратите внимание, что taskCompleted() должен вызываться внутри finallyблокировать, чтобы сделать его невосприимчивым к возможным исключениям.

Также обратите внимание, что beforeSubmit() должен вызываться отправляющим потоком перед отправкой задачи, а не самой задачей, чтобы избежать возможного "ложного завершения", когда старые задачи завершены, а новые еще не запущены.

РЕДАКТИРОВАТЬ: Исправлена ​​важная проблема с шаблоном использования.

6 голосов
/ 10 февраля 2011

Ух ты, ребята, вы быстрые:)

Спасибо за все предложения. Фьючерсы не легко интегрируются с моей моделью, потому что я не знаю, сколько запланированных выпусков запланировано заранее. Поэтому, если я оставлю родительскую задачу в живых, просто дождавшись завершения ее рекурсивных дочерних задач, у меня будет много мусора.

Я решил свою проблему, используя предложение AtomicInteger. По сути, я подклассифицировал ThreadPoolExecutor и увеличивал счетчик на вызовы execute () и уменьшал на вызовы afterExecute (). Когда счетчик получает 0, я вызываю shutdown (). Кажется, это работает для моих проблем, но я не уверен, что это хороший способ сделать это. В частности, я предполагаю, что вы используете только execute () для добавления Runnables.

В качестве побочного узла: я сначала попытался проверить в afterExecute () количество Runnables в очереди и количество активных и отключенных рабочих, когда они равны 0; но это не сработало, потому что не все Runnables появились в очереди, а getActiveCount () также не выполнила то, что я ожидал.

Во всяком случае, вот мое решение: (если кто-то найдет серьезные проблемы с этим, пожалуйста, сообщите мне:)

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

    private final AtomicInteger executing = new AtomicInteger(0);

    public MyThreadPoolExecutor(int coorPoolSize, int maxPoolSize, long keepAliveTime,
        TimeUnit seconds, BlockingQueue<Runnable> queue) {
        super(coorPoolSize, maxPoolSize, keepAliveTime, seconds, queue);
    }


    @Override
    public void execute(Runnable command) {
        //intercepting beforeExecute is too late!
        //execute() is called in the parent thread before it terminates
        executing.incrementAndGet();
        super.execute(command);
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        int count = executing.decrementAndGet();
        if(count == 0) {
            this.shutdown();
        }
    }

}
2 голосов
/ 01 мая 2011

Вы можете создать свой собственный пул потоков, который расширяет ThreadPoolExecutor . Вы хотите знать, когда задание было отправлено и когда оно завершилось.

public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    private int counter = 0;

    public MyThreadPoolExecutor() {
        super(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    public synchronized void execute(Runnable command) {
        counter++;
        super.execute(command);
    }

    @Override
    protected synchronized void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        counter--;
        notifyAll();
    }

    public synchronized void waitForExecuted() throws InterruptedException {
        while (counter == 0)
            wait();
    }
}
1 голос
/ 10 февраля 2011

Используйте Future для ваших задач (вместо отправки Runnable), обратный вызов обновляет его состояние по завершении, поэтому вы можете использовать Future.isDone для отслеживания состояние всех ваших задач.

0 голосов
/ 21 июня 2019

Вы можете использовать бегунок, который отслеживает запущенные потоки:

Runner runner = Runner.runner(numberOfThreads);

runner.runIn(2, SECONDS, callable);
runner.run(callable);


// blocks until all tasks are finished (or failed)
runner.waitTillDone();


// and reuse it
runner.runRunnableIn(500, MILLISECONDS, runnable);


runner.waitTillDone();


// and then just kill it
runner.shutdownAndAwaitTermination();

, чтобы использовать его, просто добавьте зависимость:

compile 'com.github.matejtymes: javafixes: 1.3+0,0'

0 голосов
/ 02 сентября 2016

Единственное неуместное решение, которое я мог придумать, - это напрямую использовать ThreadPoolExecutor и периодически запрашивать его getPoolSize (). Неужели нет лучшего способа сделать это?

Вы должны использовать shutdown() , awaitTermination () and shutdownNow() методы в правильной последовательности.

shutdown(): Инициирует упорядоченное завершение работы, при котором выполняются ранее отправленные задачи, но новые задачи не принимаются.

awaitTermination(): блокируется до тех пор, пока все задачи не завершат выполнение после запроса на выключение, или не истечет время ожидания, или текущий поток не будет прерван, в зависимости от того, что произойдет раньше.

shutdownNow(): Пытается остановить все активно выполняемые задачи, останавливает обработку ожидающих задач и возвращает список задач, ожидающих выполнения.

Рекомендованный способ со страницы документации оракула ExecutorService :

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }

Вы можете заменить условие if на условие while в случае большой продолжительности выполнения задач, как показано ниже:

Изменить

if (!pool.awaitTermination(60, TimeUnit.SECONDS))

К

 while(!pool.awaitTermination(60, TimeUnit.SECONDS)) {
     Thread.sleep(60000);
 }  

Вы можете обратиться к другим альтернативам (кроме join(), который может использоваться с автономной резьбой) в:

ждать, пока все потоки не завершат свою работу в java

0 голосов
/ 26 декабря 2014

Должен сказать, что описанные выше решения проблемы с рекурсивной задачей вызова и ожиданием задач конца подзаказа меня не удовлетворяют.Вот мое решение, вдохновленное оригинальной документацией Oracle: CountDownLatch и пример там: Кадровые ресурсы CountDownLatch .

Первый общий поток в процессе в экземпляре класса HRManagerCompactимеет ожидающую защелку для двух дочерних потоков, которая имеет ожидающую защелку для их последующих двух дочерних потоков ... и т. д.

Конечно, для защелки может быть установлено значение, отличное от 2 (в конструкторе CountDownLatch), так каккроме того, число запускаемых объектов может быть установлено в итерации, т. е. ArrayList, но оно должно соответствовать (количество обратных отсчетов должно быть равно параметру в конструкторе CountDownLatch).

Будьте осторожны, число защелок возрастает экспоненциальноусловие ограничения: 'level.get () <2', а также количество объектов.1, 2, 4, 8, 16 ... и защелки 0, 1, 2, 4 ... Как видите, для четырех уровней (level.get () <4) будет 15 ожидающих потоков и 7 защелокв то время, когда работает пик 16 потоков. </p>

package processes.countdownlatch.hr;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/** Recursively latching running classes to wait for the peak threads
 *
 * @author hariprasad
 */
public class HRManagerCompact extends Thread {
  final int N = 2; // number of daughter's tasks for latch
  CountDownLatch countDownLatch;
  CountDownLatch originCountDownLatch;
  AtomicInteger level = new AtomicInteger(0);
  AtomicLong order = new AtomicLong(0); // id latched thread waiting for

  HRManagerCompact techLead1 = null;
  HRManagerCompact techLead2 = null;
  HRManagerCompact techLead3 = null;

// constructor
public HRManagerCompact(CountDownLatch countDownLatch, String name,
    AtomicInteger level, AtomicLong order){
  super(name);
  this.originCountDownLatch=countDownLatch;
  this.level = level;
  this.order = order;
 }

 private void doIt() {
    countDownLatch = new CountDownLatch(N);
    AtomicInteger leveli = new AtomicInteger(level.get() + 1);
    AtomicLong orderi = new AtomicLong(Thread.currentThread().getId());
    techLead1 = new HRManagerCompact(countDownLatch, "first", leveli, orderi);
    techLead2 = new HRManagerCompact(countDownLatch, "second", leveli, orderi);
    //techLead3 = new HRManagerCompact(countDownLatch, "third", leveli);

    techLead1.start();
    techLead2.start();
    //techLead3.start();

    try {
     synchronized (Thread.currentThread()) { // to prevent print and latch in the same thread
       System.out.println("*** HR Manager waiting for recruitment to complete... " + level + ", " + order + ", " + orderi);
       countDownLatch.await(); // wait actual thread
     }
     System.out.println("*** Distribute Offer Letter, it means finished. " + level + ", " + order + ", " + orderi);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
  }

 @Override
 public void run() {
  try {
   System.out.println(Thread.currentThread().getName() + ": working... " + level + ", " + order + ", " + Thread.currentThread().getId());
   Thread.sleep(10*level.intValue());
   if (level.get() < 2) doIt();
   Thread.yield();
  }
  catch (Exception e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  /*catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }*/
  // TODO Auto-generated method stub
  System.out.println("--- " +Thread.currentThread().getName() + ": recruted " + level + ", " + order + ", " + Thread.currentThread().getId());
  originCountDownLatch.countDown(); // count down
 }

 public static void main(String args[]){
  AtomicInteger levelzero = new AtomicInteger(0);
  HRManagerCompact hr = new HRManagerCompact(null, "zero", levelzero, new AtomicLong(levelzero.longValue()));
  hr.doIt();
 }
}

Возможный закомментированный вывод (с некоторой вероятностью):

first: working... 1, 1, 10 // thread 1, first daughter's task (10)
second: working... 1, 1, 11 // thread 1, second daughter's task (11)
first: working... 2, 10, 12 // thread 10, first daughter's task (12)
first: working... 2, 11, 14 // thread 11, first daughter's task (14)
second: working... 2, 11, 15 // thread 11, second daughter's task (15)
second: working... 2, 10, 13 // thread 10, second daughter's task (13)
--- first: recruted 2, 10, 12 // finished 12
--- first: recruted 2, 11, 14 // finished 14
--- second: recruted 2, 10, 13  // finished 13 (now can be opened latch 10)
--- second: recruted 2, 11, 15  // finished 15 (now can be opened latch 11)
*** HR Manager waiting for recruitment to complete... 0, 0, 1
*** HR Manager waiting for recruitment to complete... 1, 1, 10
*** Distribute Offer Letter, it means finished. 1, 1, 10 // latch on 10 opened
--- first: recruted 1, 1, 10 // finished 10
*** HR Manager waiting for recruitment to complete... 1, 1, 11
*** Distribute Offer Letter, it means finished. 1, 1, 11 // latch on 11 opened
--- second: recruted 1, 1, 11  // finished 11 (now can be opened latch 1)
*** Distribute Offer Letter, it means finished. 0, 0, 1  // latch on 1 opened
0 голосов
/ 03 мая 2011

Если вы хотите использовать классы JSR166y - например, Phaser или Fork / Join - любой из них может работать для вас, вы всегда можете загрузить их бэкпорт Java 6 с: http://gee.cs.oswego.edu/dl/concurrency-interest/ и использовать его в качестве основы, а не писать полностью самодельное решение. Затем, когда выйдет 7, вы можете просто удалить зависимость от бэкпорта и изменить несколько имен пакетов.

(Полное раскрытие информации: мы уже некоторое время используем LinkedTransferQueue в prod. Никаких проблем)

0 голосов
/ 01 мая 2011

(mea culpa: «немного» после моего сна;) но вот первая попытка динамической защелки):

package oss.alphazero.sto4958330;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class DynamicCountDownLatch {
    @SuppressWarnings("serial")
    private static final class Sync extends AbstractQueuedSynchronizer {
        private final CountDownLatch toplatch;
        public Sync() {
            setState(0);
            this.toplatch = new CountDownLatch(1);
        }

        @Override
        protected int tryAcquireShared(int acquires){
            try {
                toplatch.await();
            } 
            catch (InterruptedException e) {
                throw new RuntimeException("Interrupted", e);
            }
            return getState() == 0 ? 1 : -1;
        }
        public boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc)) 
                    return nextc == 0;
            }
        }
        public boolean tryExtendState(int acquires) {
            for (;;) {
                int s = getState();
                int exts = s+1;
                if (compareAndSetState(s, exts)) {
                    toplatch.countDown();
                    return exts > 0;
                }
            }
        }
    }
    private final Sync sync;
    public DynamicCountDownLatch(){
        this.sync = new Sync();
    }
    public void await() 
        throws InterruptedException   
    {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit   unit) 
        throws InterruptedException   
    {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    public void countDown() {
        sync.releaseShared(1);
    }
    public void join() {
        sync.tryExtendState(1);
    }
}

Эта защелка вводит новый метод join () для существующего (клонированный) CountDownLatch API, который используется задачами для сигнализации о своем входе в большую группу задач.

Защелка передается от родительской задачи к дочерней.Каждая задача в соответствии с шаблоном Сураджа сначала «присоединяет ()» защелку, выполняет свою задачу (), а затем countDown ().

Для решения ситуаций, когда основной поток запускает группу задач, а затем немедленно ожидает() - до того, как какой-либо из потоков задач успел даже присоединиться () - topLatch используется во внутреннем Sync классе.Это защелка, которая будет отсчитываться при каждом соединении ();конечно, важен только первый обратный отсчет, поскольку все последующие являются nops.

В первоначальной реализации, приведенной выше, вводится семантическая морщинка, поскольку tryAcquiredShared (int) не должен вызывать исключение InterruptedException, но тогданужно иметь дело с прерыванием по ожиданию на topLatch.

Является ли это улучшением по сравнению с собственным решением OP, использующим атомные счетчики?Я бы сказал, что, вероятно, не IFF, он настаивает на использовании Executors, но, как я полагаю, это в равной степени обоснованный альтернативный подход, использующий AQS в этом случае, и его также можно использовать с общими потоками.

Отбрасывайте друзей-хакеров.

...