Я столкнулся с этой проблемой параллелизма, которую я почесал в голове несколько дней.
По сути, я хочу, чтобы мой ThreadPoolExecutor ожидал завершения всех задач (количество неизвестных задач) перед тем, как завершить работу.
public class AutoShutdownThreadPoolExecutor extends ThreadPoolExecutor{
private static final Logger logger = Logger.getLogger(AutoShutdownThreadPoolExecutor.class);
private int executing = 0;
private ReentrantLock lock = new ReentrantLock();
private final Condition newTaskCondition = lock.newCondition();
private final int WAIT_FOR_NEW_TASK = 120000;
public AutoShutdownThreadPoolExecutor(int coorPoolSize, int maxPoolSize, long keepAliveTime,
TimeUnit seconds, BlockingQueue<Runnable> queue) {
super(coorPoolSize, maxPoolSize, keepAliveTime, seconds, queue);
}
@Override
public void execute(Runnable command) {
lock.lock();
executing++;
lock.unlock();
super.execute(command);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
try{
lock.lock();
int count = executing--;
if(count == 0) {
newTaskCondition.await(WAIT_FOR_NEW_TASK, TimeUnit.MILLISECONDS);
if(count == 0){
this.shutdown();
logger.info("Shutting down Executor...");
}
}
}
catch (InterruptedException e) {
logger.error("Sleeping task interrupted", e);
}
finally{
lock.unlock();
}
}
}
Предполагается, что задача проверяет счетчик задач (выполняет), и, если он равен 0, она на некоторое время блокируется, а затем снимает блокировку, чтобы другие задачи могли иметь возможностьвыполнить и не завершать работу исполнителя слишком рано.
Однако этого не происходит.Все 4 потока в исполнителе переходят в состояние ожидания:
"pool-1-thread-4" prio=6 tid=0x034a1000 nid=0x2d0 waiting on condition [0x039cf000]
"pool-1-thread-3" prio=6 tid=0x034d0400 nid=0x1328 waiting on condition [0x0397f000]
"pool-1-thread-2" prio=6 tid=0x03493400 nid=0x15ec waiting on condition [0x0392f000]
"pool-1-thread-1" prio=6 tid=0x034c3800 nid=0x1fe4 waiting on condition [0x038df000]
Если я добавлю инструкцию log (предполагается, что поток будет замедлен) в классе Runnable, проблема, похоже, исчезнет.
public void run() {
// logger.info("Starting task" + taskName);
try {
//doTask();
}
catch (Exception e){
logger.error("task " + taskName + " failed", e);
}
}
Вопрос похож на этот пост Java ExecutorService: awaitTermination всех рекурсивно созданных задач
Я принял первоначальное решение для постера и попытался обратиться к состоянию гонки в afterExecute () но это не работает.
Пожалуйста, помогите пролить свет на это.Спасибо.