Как приостановить / возобновить все потоки в ExecutorService в Java? - PullRequest
21 голосов
/ 17 марта 2012

Я отправил кучу заданий в службу executorservice на Java, и я как-то хочу временно приостановить все эти задания.Какой лучший способ сделать это?Как я могу возобновить?Или я делаю это совершенно неправильно?Должен ли я следовать некоторому другому шаблону для того, чего я хочу достичь (например, возможность приостановить / возобновить выполнение служб)?

Ответы [ 5 ]

14 голосов
/ 18 марта 2012

Чтобы ответить на мой собственный вопрос, я нашел пример PausableThreadPoolExecutor в javadocs самого ThreadPoolExecutor . Вот моя версия с использованием мониторов Guava:

import com.google.common.util.concurrent.Monitor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;

public class PausableExecutor extends ScheduledThreadPoolExecutor {

    private boolean isPaused;

    private final Monitor monitor = new Monitor();
    private final Monitor.Guard paused = new Monitor.Guard(monitor) {
        @Override
        public boolean isSatisfied() {
            return isPaused;
        }
    };

    private final Monitor.Guard notPaused = new Monitor.Guard(monitor) {
        @Override
        public boolean isSatisfied() {
            return !isPaused;
        }
    };

    public PausableExecutor(int corePoolSize, ThreadFactory threadFactory) {
        super(corePoolSize, threadFactory);
    }

    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        monitor.enterWhenUninterruptibly(notPaused);
        try {
            monitor.waitForUninterruptibly(notPaused);
        } finally {
            monitor.leave();
        }
    }

    public void pause() {
        monitor.enterIf(notPaused);
        try {
            isPaused = true;
        } finally {
            monitor.leave();
        }
    }

    public void resume() {
        monitor.enterIf(paused);
        try {
            isPaused = false;
        } finally {
            monitor.leave();
        }
    }
}
9 голосов
/ 25 апреля 2013

Я высказал некоторые критические замечания по поводу вашего принятого ответа, но они не были очень конструктивными ... Итак, вот мое решение. Я хотел бы использовать такой класс, как этот, а затем вызывать checkIn везде / всегда, когда я хочу сделать паузу. Найдите его на GitHub !

import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Provides a mechanism to pause multiple threads.
 * If wish your thread to participate, then it must regularly check in with an instance of this object.
 * 
 * @author Corin Lawson <corin@phiware.com.au>
 */
public class Continue {
    private boolean isPaused;
    private ReentrantLock pauseLock = new ReentrantLock();
    private Condition unpaused = pauseLock.newCondition();

    public void checkIn() throws InterruptedException {
        if (isPaused) {
            pauseLock.lock();
            try {
                while (isPaused)
                    unpaused.await();
            } finally {
                pauseLock.unlock();
            }
        }
    }

    public void checkInUntil(Date deadline) throws InterruptedException {
        if (isPaused) {
            pauseLock.lock();
            try {
                while (isPaused)
                    unpaused.awaitUntil(deadline);
            } finally {
                pauseLock.unlock();
            }
        }
    }

    public void checkIn(long nanosTimeout) throws InterruptedException {
        if (isPaused) {
            pauseLock.lock();
            try {
                while (isPaused)
                    unpaused.awaitNanos(nanosTimeout);
            } finally {
                pauseLock.unlock();
            }
        }
    }

    public void checkIn(long time, TimeUnit unit) throws InterruptedException {
        if (isPaused) {
            pauseLock.lock();
            try {
                while (isPaused)
                    unpaused.await(time, unit);
            } finally {
                pauseLock.unlock();
            }
        }
    }

    public void checkInUninterruptibly() {
        if (isPaused) {
            pauseLock.lock();
            try {
                while (isPaused)
                    unpaused.awaitUninterruptibly();
            } finally {
                pauseLock.unlock();
            }
        }
    }

    public boolean isPaused() {
        return isPaused;
    }

    public void pause() {
        pauseLock.lock();
        try {
            isPaused = true;
        } finally {
            pauseLock.unlock();
        }
    }

    public void resume() {
        pauseLock.lock();
        try {
            if (isPaused) {
                isPaused = false;
                unpaused.signalAll();
            }
        } finally {
            pauseLock.unlock();
        }
    }
}

Например:

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;

public class PausableExecutor extends ScheduledThreadPoolExecutor {
    private Continue cont;

    public PausableExecutor(int corePoolSize, ThreadFactory threadFactory, Continue c) {
        super(corePoolSize, threadFactory);
        cont = c;
    }

    protected void beforeExecute(Thread t, Runnable r) {
        cont.checkIn();
        super.beforeExecute(t, r);
    }
}

Это дает дополнительное преимущество, заключающееся в том, что вы можете приостановить многие потоки одним вызовом Continue s pause.

5 голосов
/ 23 ноября 2014

Я искал функциональность паузы / возобновления в исполнителе, но с дополнительной возможностью ждать любых обрабатываемых в данный момент задач. Ниже представлен вариант других замечательных реализаций этого SO с добавлением функций await. Я тестировал его на исполнителе с одним потоком. Итак, основное использование:

executor.pause();
executor.await(10000); // blocks till current tasks processing ends

код класса:

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class PausableScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {      
  public boolean isPaused;
  private ReentrantLock pauseLock = new ReentrantLock();
  private Condition unpaused = pauseLock.newCondition();
  private Latch activeTasksLatch = new Latch();

  private class Latch {
    private final Object synchObj = new Object();
    private int count;

    public boolean awaitZero(long waitMS) throws InterruptedException {
      long startTime = System.currentTimeMillis();
      synchronized (synchObj) {
        while (count > 0) {
          if ( waitMS != 0) {
            synchObj.wait(waitMS);
            long curTime = System.currentTimeMillis();
            if ( (curTime - startTime) > waitMS ) {                
              return count <= 0;
            }
          }
          else
            synchObj.wait();
        }
        return count <= 0; 
      }
    }
    public void countDown() {
      synchronized (synchObj) {
        if (--count <= 0) {
          // assert count >= 0;              
          synchObj.notifyAll();
        }
      }
    }
    public void countUp() {
      synchronized (synchObj) {
        count++;
      }
    }    
  }

  /**
   * Default constructor for a simple fixed threadpool
   */
  public PausableScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize);
  }

  /**
   * Executed before a task is assigned to a thread.
   */
  @Override
  protected void beforeExecute(Thread t, Runnable r) {
    pauseLock.lock();
    try {
      while (isPaused)
        unpaused.await();
    } catch (InterruptedException ie) {
      t.interrupt();
    } finally {
      pauseLock.unlock();
    }

    activeTasksLatch.countUp();
    super.beforeExecute(t, r);
  }

  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    try {
      super.afterExecute(r, t);
    }
    finally {
      activeTasksLatch.countDown();
    }
  }

  /**
   * Pause the threadpool. Running tasks will continue running, but new tasks
   * will not start untill the threadpool is resumed.
   */
  public void pause() {
    pauseLock.lock();
    try {
      isPaused = true;
    } finally {
      pauseLock.unlock();
    }
  }

  /**
   * Wait for all active tasks to end.
   */ 
  public boolean await(long timeoutMS) {
    // assert isPaused;
    try {
      return activeTasksLatch.awaitZero(timeoutMS);
    } catch (InterruptedException e) {
      // log e, or rethrow maybe
    }
    return false;
  }

  /**
   * Resume the threadpool.
   */
  public void resume() {
    pauseLock.lock();
    try {
      isPaused = false;
      unpaused.signalAll();
    } finally {
      pauseLock.unlock();
    }
  }

}
4 голосов
/ 17 марта 2012

Проблема в том, что Runnable / Callable сами должны проверять, когда нужно приостановить / возобновить. При этом существует множество способов сделать это, и от ваших требований зависит, как лучше всего это сделать. Каким бы ни было ваше решение, вам нужно, чтобы ожидание было прерываемым, поэтому поток может быть аккуратно остановлен.

0 голосов
/ 26 июля 2018

Я знаю, что это старый, но я пробовал все эти ответы, и ни один из них не работал для того, что я пытался сделать с паузой таймера;все они выбросили бы все данные, которые он делал бы по расписанию, как только он возобновился (все сразу).

Вместо этого я нашел этот класс Timer на GitHub * здесь .Это очень хорошо сработало для меня.

* Я не писал этот код, просто нашел его.

...