Что завершает мой Java ExecutorService - PullRequest
7 голосов
/ 08 июля 2019

Первоначально я видел эту проблему с более сложным подклассом ThreadPoolExecutor, но я упростил, поэтому теперь содержит не намного больше, чем некоторая дополнительная отладка, и все еще получаю ту же проблему.

import com.jthink.songkong.cmdline.SongKong;
import com.jthink.songkong.ui.MainWindow;
import com.jthink.songkong.util.SongKongThreadFactory;

import java.util.concurrent.*;
import java.util.logging.Level;



public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor
{
    /**
     * Uses the default CallerRunsPolicy when queue is full
     *  @param workerSize
     * @param threadFactory
     * @param queue
     */
    public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, LinkedBlockingQueue<Runnable> queue)
    {
        super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, queue, threadFactory, new CallerRunsPolicy());
    }

    /**
     * Allow caller to specify the RejectedExecutionPolicy
     *  @param workerSize
     * @param threadFactory
     * @param queue
     * @param reh
     */
    public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, LinkedBlockingQueue<Runnable> queue, RejectedExecutionHandler reh)
    {
        super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, queue, threadFactory, reh);
    }

    @Override
    public <T> FutureCallable<T> newTaskFor(Callable<T> callable) {
        return new FutureCallable<T>(callable);
    }

    /**
     * Check not been paused
     *
     * @param t
     * @param r
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        SongKong.checkIn();
    }

    /**
     * After execution
     *
     * @param r
     * @param t
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t)
    {
        super.afterExecute(r, t);

        if (t == null && r instanceof Future<?>)
        {
            try
            {
              Object result = ((Future<?>) r).get();
            }
            catch (CancellationException ce)
            {
                t = ce;
            }
            catch (ExecutionException ee)
            {
                t = ee.getCause();
            }
            catch (InterruptedException ie)
            {
                Thread.currentThread().interrupt(); // ignore/reset
            }
        }
        if (t != null)
        {
            MainWindow.logger.log(Level.SEVERE, "AFTER EXECUTE---" + t.getMessage(), t);
        }
    }

    @Override
    protected void terminated()
    {
        //All tasks have completed either naturally or via being cancelled by timeout task so close the timeout task
        MainWindow.logger.severe("---Terminated:"+((SongKongThreadFactory)getThreadFactory()).getName());
        MainWindow.userInfoLogger.severe("---Terminated:"+((SongKongThreadFactory)getThreadFactory()).getName());
        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
        for(StackTraceElement ste:stackTrace)
        {
            MainWindow.logger.log(Level.SEVERE, ste.toString());
        }
        for(StackTraceElement ste:stackTrace)
        {
            MainWindow.userInfoLogger.log(Level.SEVERE, ste.toString());
        }
    }

    @Override
    public void shutdown()
    {
        MainWindow.logger.severe("---Shutdown:"+((SongKongThreadFactory)getThreadFactory()).getName());
        MainWindow.userInfoLogger.severe("---Shutdown:"+((SongKongThreadFactory)getThreadFactory()).getName());
        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
        for(StackTraceElement ste:stackTrace)
        {
            MainWindow.logger.log(Level.SEVERE, ste.toString());
        }
        for(StackTraceElement ste:stackTrace)
        {
            MainWindow.userInfoLogger.log(Level.SEVERE, ste.toString());
        }
        super.shutdown();
    }
}

ЭтоExecutorService используется следующим классом, который позволяет экземпляру асинхронно отправлять задачи, ExecutorService не следует отключать, пока все отправленные задачи не будут завершены.

package com.jthink.songkong.analyse.analyser;

import com.jthink.songkong.preferences.GeneralPreferences;
import com.jthink.songkong.ui.MainWindow;
import com.jthink.songkong.util.SongKongThreadFactory;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;

/**
 *  Sets a timeout of each task submitted and cancel them if take longer than the timeout
 *
 *  The timeout is set to 30 minutes, we only want to call if really broken, it should not happen under usual circumstances
 */
public class MainAnalyserService extends AnalyserService
{
    //For monitoring/controlling when finished
    private final AtomicInteger pendingItems = new AtomicInteger(0);
    private final CountDownLatch latch = new CountDownLatch(1);

    //If task has not completed 30 minutes after it started (added to queue) then it should be cancelled
    private static final int TIMEOUT_PER_TASK = 30;

    private static MainAnalyserService mas;

    public static MainAnalyserService getInstanceOf()
    {
        return mas;
    }

    public static MainAnalyserService create(String threadGroup)
    {
        mas = new MainAnalyserService(threadGroup);
        return mas;
    }

    public MainAnalyserService(String threadGroup)
    {
        super(threadGroup);
        initExecutorService();
    }

    /**
    Configure thread to match cpus but even if single cpu ensure have at least two threads to protect against
    scenario where there is only cpu and that thread is waiting on i/o rather than being cpu bound this would allow
    other thread to do something.
     */
    @Override
    protected void initExecutorService()
    {
        int workerSize = GeneralPreferences.getInstance().getWorkers();
        if(workerSize==0)
        {
            workerSize = Runtime.getRuntime().availableProcessors();
        }
        //Even if only have single cpu we still have multithread so we dont just have single thread waiting on I/O
        if(workerSize< MIN_NUMBER_OF_WORKER_THREADS)
        {
            workerSize = MIN_NUMBER_OF_WORKER_THREADS;
        }
        MainWindow.userInfoLogger.severe("Workers Configuration:"+ workerSize);
        MainWindow.logger.severe("Workers Configuration:"+ workerSize);

        executorService = new TimeoutThreadPoolExecutor(workerSize,
                new SongKongThreadFactory(threadGroup),
                new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),
                TIMEOUT_PER_TASK,
                TimeUnit.MINUTES,
                new EnsureIncreaseCountIfRunOnCallingThread());
    }

    public AtomicInteger getPendingItems()
    {
        return pendingItems;
    }

    /**
     * If queue is full this gets called and we log that we run task on local calling thread.
     */
    class EnsureIncreaseCountIfRunOnCallingThread implements RejectedExecutionHandler
    {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public EnsureIncreaseCountIfRunOnCallingThread() { }

        /**
         * Executes task on calling thread, ensuring we increment count
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown())
            {
                try
                {
                    MainWindow.userInfoLogger.severe(">>SubmittedLocally:" + ((FutureCallable) r).getCallable().getClass().getName() + ":" + pendingItems.get());
                    r.run();
                    MainWindow.userInfoLogger.severe(">>CompletedLocally:" + ((FutureCallable) r).getCallable().getClass().getName() + ":" +  pendingItems.get());
                }
                catch(Exception ex)
                {
                    MainWindow.userInfoLogger.log(Level.SEVERE, ex.getMessage(), ex);
                }
            }
        }
    }

    /**
     * Increase count and then Submit to ExecutorService
     *
     * @param callingTask
     * @param task
     */
    public void submit(Callable<Boolean> callingTask, Callable<Boolean> task) //throws Exception
    {
        //Ensure we increment before calling submit in case rejectionExecution comes into play
        int remainingItems = pendingItems.incrementAndGet();
        executorService.submit(task);
        MainWindow.userInfoLogger.severe(">>Submitted:" + task.getClass().getName() + ":" + remainingItems);
    }

    public ExecutorService getExecutorService()
    {
        return executorService;
    }

    /**
     * Must be called by Callable when it has finished work (or if error)
     *
     * @param task
     */
    public void workDone(Callable task)
    {
        int remainingItems = pendingItems.decrementAndGet();
        MainWindow.userInfoLogger.severe(">>WorkDone:" + task.getClass().getName() + ":" +remainingItems);
        if (remainingItems == 0)
        {
            MainWindow.userInfoLogger.severe(">Closing Latch:");
            latch.countDown();
        }
    }

    /**
     * Wait for latch to close, this should occur once all submitted aysync tasks have finished in some way
     *
     * @throws InterruptedException
     */
    public void awaitCompletion() throws InterruptedException{
        latch.await();
    }
}

Вызывающий класс имеет

//Just waits for all the async tasks on the list to complete/fail
analyserService.awaitCompletion();
MainWindow.userInfoLogger.severe(">MainAnalyser Completed");

Для одного клиента вызывался метод terminated(), несмотря на то, что есть еще задачи, которые не были выполнены, и служба executorservice работала только в течение 8 минут, и ни одна из задач не вышла из-под контроля.Я также видел проблему локально

Отладка показывает

UserLog

05/07/2019 11.29.38:EDT:SEVERE: ----G14922:The Civil War:8907617:American Songs of Revolutionary Times and the Civil War Era:NoScore
05/07/2019 11.29.38:EDT:SEVERE: >>Submitted:com.jthink.songkong.analyse.analyser.SongSaver:69
05/07/2019 11.29.38:EDT:SEVERE: >>WorkDone:com.jthink.songkong.analyse.analyser.DiscogsSongGroupMatcher:68
05/07/2019 11.29.38:EDT:SEVERE: >MainAnalyser Finished
05/07/2019 11.29.38:EDT:INFO: Stop

DebugLog

   05/07/2019 11.29.38:EDT:TimeoutThreadPoolExecutor:terminated:SEVERE: ---Terminated:Worker

Таким образом, мы видим, что есть еще 68 задаччтобы завершить, и MainAnalyser не закрыл защелку, но исполнитель threadpool завершил работу

Я переопределил shutdown (), чтобы увидеть, вызывается ли это, и нет, вызывается

terminate ()с помощью runWorker (), runWorker () должен продолжаться в цикле до тех пор, пока очередь не станет пустой, что не является, но что-то заставляет ее выходить из цикла, а processWorkerExit () после выполнения еще нескольких проверок в конечном итоге завершает работу всего Executor (не только рабочегоthread)

10/07/2019 07.11.51:BST:MainAnalyserService:submit:SEVERE: >>Submitted:com.jthink.songkong.analyse.analyser.DiscogsSongGroupMatcher:809
10/07/2019 07.11.51:BST:MainAnalyserService:workDone:SEVERE: >>WorkDone:com.jthink.songkong.analyse.analyser.MusicBrainzSongGroupMatcher2:808
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: ---Terminated:Worker
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.lang.Thread.getStackTrace(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: com.jthink.songkong.analyse.analyser.TimeoutThreadPoolExecutor.terminated(TimeoutThreadPoolExecutor.java:118)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor.tryTerminate(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor.processWorkerExit(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.lang.Thread.run(Unknown Source)

Поскольку ThreadPoolExecutor является частью стандартной Java, я не могу (легко) установить точки останова, чтобы попытаться выяснить, что он делает, это код ThreadPoolExecutor (стандартный Jave, а не мой код)

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

Мы экспериментировали с размером очереди в Executor, по умолчанию он был равен 100, потому что я не хотел, чтобы он становился слишком большим, как очередьsks будет использовать больше памяти, и я бы предпочел, чтобы вызывающие задачи выполнялись самостоятельно, если очередь занята.Но в попытке решить проблему (и убрать необходимость вызова CallerRunPolicy из-за переполнения очереди) я увеличил размер очереди до 1000, и это привело к более быстрому возникновению ошибки, а затем полностью удалил ограничение и продолжил быстро падать

 new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),

Я искал альтернативу ThreadExecutorPool и наткнулся на ForkJoinPool - https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html

Одна вещь, которую я заметил, состоит в том, что ForkJoinPool имеет разные методы для отправки задач из задачи, переданной в ForkJoinPool, по сравнению сотправка формы снаружи.Я не знаю, почему это так, но мне интересно, потому что я отправляю задачи изнутри задач, выполняемых Executor, что может каким-то образом вызвать проблему?

Теперь мне удалось создать собственную версию ThreadPoolExecutor, просто скопировав /вставка кода в новый класс, переименование, а также необходимость создания версии RejectedExcecutionhandler, которая ожидает мой класс, а не ThreadPoolExecutor и запустила его.

Начал добавлять отладку, чтобы посмотреть, смогу ли я расшифровать то, что происходит, есть идеи ?

Перед вызовом processWorkerExit Я добавил

 MainWindow.userInfoLogger.severe("-----------------------"+getTaskCount()
                    +":"+getActiveCount()
                    +":"+w.completedTasks
                    +":"+ completedAbruptly);

и получил по ошибке

-----------------------3686:0:593:false

Ответы [ 3 ]

2 голосов
/ 11 июля 2019

В течение долгого времени я думал, что проблема должна быть с моим кодом, затем я начал думать, что проблема была с ThreadPoolExecutor, но добавление отладки к моей собственной версии runWorker() показало, что проблема действительно была моим собственным кодом.

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                MainWindow.userInfoLogger.severe("-----------------------"+workQueue.size());

Из этого я мог видеть, что в то время как рабочая очередь становилась в целом длиннее и соответствовала значению

MainThreadAnalyzer.pendingItems -noOfWorkerThreads

в определенный момент эти два значения разошлись, и это было тогда, когда процесс SongLoader (который по ошибке я на самом деле не рассматривал) завершился. Поэтому MainThreadAnalyzer продолжал отправлять работу, увеличивая значение pendingItems, но размер рабочей очереди Executor становился все меньше.

Это привело к осознанию того, что Executor раньше выключал (), но мы этого не осознавали, потому что проверяли только защелки после закрытия загрузчика песен.

И причина, по которой он отключился, заключалась в том, что на раннем этапе MainAnalyzerThread завершал работу быстрее, чем SongLoader отправлял ее, так что значение pendingItems было временно установлено на ноль, позволяя закрыть защелку.

Решение заключается в следующем

Добавить логический флаг, указывающий, когда songLoader завершен, и разрешать закрытие защелки только после установки этого флага.

private boolean songLoaderCompleted = false;
public void workDone(Callable task)
    {
        int remainingItems = pendingItems.decrementAndGet();
        MainWindow.logger.severe(">>WorkDone:" + task.getClass().getName() + ":" +remainingItems);

        if (remainingItems == 0 && songLoaderCompleted)
        {
            MainWindow.logger.severe(">Closing Latch:");
            latch.countDown();
        }
    }

Затем в основном потоке установите этот флаг после завершения SongLoader

 //Start SongLoader
ExecutorService songLoaderService = SongLoader.getExecutorService();
songLoaderService.submit(loader);

//SongLoader uses CompletionService when calls LoadFolderWorkers so shutdown wont return until all folder
//submissions completed to the MainAnalyserService
songLoaderService.shutdown();
songLoaderService.awaitTermination(10, TimeUnit.DAYS);
MainWindow.userInfoLogger.severe(">Song Loader Finished");

//Were now allowed to consider closing the latch because we know all songs have now been loaded
//so no false chance of zeroes
analyserService.setSongLoaderCompleted();

//Just waits for all the async tasks on the list to complete/fail
analyserService.awaitCompletion();
MainWindow.userInfoLogger.severe(">MainAnalyser Completed");

//This should be immediate as there should be no tasks still remaining
analyserService.getExecutorService().shutdown();
analyserService.getExecutorService().awaitTermination(10, TimeUnit.DAYS);
1 голос
/ 11 июля 2019

Вы просто неправильно используете ExecutorService.

Что вы делаете (даже в своем «решении»), это

  • Отправить задачи
  • Подождите, пока они закончат
  • Shutdown
  • Подождите, пока произойдет отключение (почему это на самом деле?)

Что вы должны сделать, это:

  • Отправить задачи
  • Завершить работу исполнителя, чтобы не допустить новых задач
  • Ожидание завершения - блокируется до тех пор, пока не будут выполнены все задачи или не истечет время ожидания

Вы должны проверить статус возврата awaitTermination, потому что

  • Если true - все задачи завершены до истечения заданного времени ожидания
  • Если false - не все задачи еще завершены - и, вероятно, вам не следует запускать второй пул в таком случае.

Также есть 2 варианта использования потокового исполнителя. Вы можете создавать рабочие потоки и позволять им решать, что они должны делать - как вы делали, зацикливаясь в рабочем потоке для новых задач

Или (что я предпочитаю), оберните все, что ваша работа должна выполнять, в отдельную задачу (наиболее вероятно, что у вас есть в теле цикла) и отправьте как отдельную задачу в пул. ExecutorService выполнит планирование за вас.

0 голосов
/ 12 июля 2019

вы можете изменить awaitCompletion так, чтобы он читал:

public void awaitCompletion() throws InterruptedException{
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.DAYS);
}

может быть latch.await (), если попадет InterruptedException.

...