ThreadPoolExecutor Dynami c выполнение задачи, дождаться завершения всех задач - PullRequest
1 голос
/ 28 января 2020

У меня есть ThreadPoolExecutor как таковое

ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());

Задачи выполняются следующим образом

executor.execute(task)

Теперь каждая задача может также выполнить больше задач одному и тому же исполнителю, и эти новые задачи могут отправить больше задач

Проблема в том, что я хочу, чтобы основной поток дождался выполнения всех задач, а затем вызвал shutdown

Гарантирован ли следующий подход к работе? (т.е. блокировать / ждать основной поток, пока все задачи не будут выполнены)

while (executor.getCompletedTaskCount() < executor.getTaskCount()) {
      try {
        Thread.sleep(100);
      } catch (InterruptedException e) {
        LOGGER.error("Exception in atomic Count wait thread sleep", e);
        break;
      }
    }
}

Будет ли это в конечном итоге вырваться из l oop? Просто путем предварительного тестирования я обнаружил, что он работает даже с исключениями в потоке

PS Я не могу использовать защелку, потому что я не знаю ни количества заданий заранее, ни принятого ответа здесь

Ответы [ 3 ]

2 голосов
/ 28 января 2020

Вероятно, вы должны сохранить отправленное будущее.

Deque<Future<?>> futures = new ConcurrentLinkedDeque<>();

Затем каждый раз, когда вы отправляете задание.

futures.add(executor.submit( runnable, "Doesn't Really Matter, but Can be Useful"));

Затем в вашем основном потоке, который ожидает.

while(futures.size()>0){
    futures.pop().get();
}

Это даст вам гарантию того, что .get не будет завершено до тех пор, пока задача не будет завершена, и если больше задач добавляются другой задачей, тогда фьючерсы будут отражать изменения до завершения исходной задачи.

1 голос
/ 28 января 2020

По моему мнению, это не будет детерминировано c, чтобы получить фактическое количество задач по той причине, что во время выполнения задач вызывается метод execute и может произойти одно из следующих 3 условий. 1. Задача начинает выполняться (добавляется в Workers) 2. Задача ставится в очередь (добавляется в WorkQueue) 3. Задача отклоняется по мере исчерпания емкости WorkerQueue, исчерпания емкости Workers и ресурсов

 /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

getTaskCount () и getCompletedTaskCount () методы защищены mainLock, поэтому мы знаем, будут ли внутренние потоки, все еще отправляющие задачи исполнителю, проверяться временем (while (executor.getCompletedTaskCount() < executor.getTaskCount())) в основном выполнении. Это условие может привести к ошибочному положительному результату, который заканчивается неверным результатом.

/**
     * Returns the approximate total number of tasks that have ever been
     * scheduled for execution. Because the states of tasks and
     * threads may change dynamically during computation, the returned
     * value is only an approximation.
     *
     * @return the number of tasks
     */
    public long getTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            long n = completedTaskCount;
            for (Worker w : workers) {
                n += w.completedTasks;
                if (w.isLocked())
                    ++n;
            }
            return n + workQueue.size();
        } finally {
            mainLock.unlock();
        }
    }
    /**
     * Returns the approximate total number of tasks that have
     * completed execution. Because the states of tasks and threads
     * may change dynamically during computation, the returned value
     * is only an approximation, but one that does not ever decrease
     * across successive calls.
     *
     * @return the number of tasks
     */
    public long getCompletedTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            long n = completedTaskCount;
            for (Worker w : workers)
                n += w.completedTasks;
            return n;
        } finally {
            mainLock.unlock();
        }
    }

** Используемые здесь фрагменты кода взяты из JDK 1.8 222

0 голосов
/ 28 января 2020

Методы, используемые для получения завершенного и отправленного количества, т. Е. executor.getCompletedTaskCount() & executor.getTaskCount(), не всегда обеспечивают 100% точный счет в соответствии с Java (8) документами, поэтому подход может работать не всегда.

public long getTaskCount()

Возвращает приблизительное общее количество задач, которые когда-либо были запланированы для выполнения. Поскольку состояния задач и потоков могут динамически изменяться во время вычислений, возвращаемое значение является только приблизительным .

public long getCompletedTaskCount()

Возвращает приблизительно общее количество задач, которые завершили выполнение. Поскольку состояния задач и потоков могут динамически изменяться во время вычислений, возвращаемое значение является только приблизительным, но оно не уменьшается при последовательных вызовах .

...