Синхронизация потоков пула ExecutorService - PullRequest
0 голосов
/ 01 июля 2018

я впервые использую пулы потоков, и я не совсем понимаю, как работает executorservice. Я помещаю водяные знаки поверх изображения и объединяю их в пустую картинку. Но даже если я использую только одну нить, она все равно будет рисовать только половину.

Это мой класс WorkerThread :

public class WorkerThread implements Runnable {

    BufferedImage source;
    BufferedImage toDraw;
    int x;
    int y;
    BufferedImage target;
    ParallelWatermarkFilter pf;

    public WorkerThread(BufferedImage source, BufferedImage toDraw, int x, int y, BufferedImage target){
        this.source = source;
        this.toDraw = toDraw;
        this.x = x;
        this.y = y;
        this.target = target;
        pf = new ParallelWatermarkFilter(source, 5);
    }

    @Override
    public void run() {
        pf.mergeImages(source, toDraw, x, y, target);
    }
}

И вот как я использую ExecutorService в моем FilterClass :

    public BufferedImage apply(BufferedImage input) {

        ExecutorService threadpool = Executors.newFixedThreadPool(numThreads);

                for (int w = 0; w < imgWidth; w += watermarkWidth) {
      for (int h = 0; h < imgHeight; h += watermarkHeight) {
            Runnable worker = new WorkerThread(input, watermark, w, h, result);
            System.out.println("WIDTH: " + w + "   HEIGHT: " + h);
            threadpool.execute(worker);
      }
    }

    threadpool.shutdown();

Не ждут ли потоки, пока не завершится один поток?

1 Ответ

0 голосов
/ 02 июля 2018

То, что ThreadPoolExecutor завершение работы и выполнение задачи / удаление рабочей очереди / получение из рабочей очереди, является редкостью. Таким образом, вы не можете полагаться на механизм прерывания потока или что-то еще. Все, что вам гарантировано, это:

Инициирует упорядоченное завершение работы, при котором ранее представленные задачи выполнено, но новые задачи не будут приняты. Призыв не имеет дополнительный эффект, если он уже выключен.

Этот метод не ожидает завершения ранее отправленных задач выполнение.

Чтобы углубиться в реализацию ThreadPoolExecutor, давайте взглянем на основной метод выполнения:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

Важнейшая часть здесь - это getTask(). Его фрагмент:

 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
     decrementWorkerCount();
     return null;
 }

Метод не синхронизирован и зависит только от порядка, заданного значением CAS ctl. ctl здесь - глобальное состояние пула, хранящееся внутри AtomicInteger (для неблокирующего получения атомарного ThreadPoolExecutor состояния).

Так что возможен следующий случай.

  1. Рабочая нить называется getTask
  2. Рабочий поток получил состояние выполнения пула. Это все еще RUNNING.
  3. Другой поток инициировал закрытие ордера и соответственно изменил ctl.
  4. Рабочий поток уже взял задачу из рабочей очереди.
...