Java ThreadPoolExecutor зависает при использовании ArrayBlockingQueue - PullRequest
9 голосов
/ 17 июня 2010

Я работаю над некоторым приложением и использую ThreadPoolExecutor для обработки различных задач.ThreadPoolExecutor застревает через некоторое время.Чтобы смоделировать это в более простой среде, я написал простой код, в котором я могу смоделировать проблему.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyThreadPoolExecutor {
    private int poolSize = 10;
    private int maxPoolSize = 50;
    private long keepAliveTime = 10;
    private ThreadPoolExecutor threadPool = null;
    private final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
            100000);

    public MyThreadPoolExecutor() {
        threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize,
                keepAliveTime, TimeUnit.SECONDS, queue);
        threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {

            @Override
            public void rejectedExecution(Runnable runnable,
                    ThreadPoolExecutor threadPoolExecutor) {
                System.out
                        .println("Execution rejected. Please try restarting the application.");
            }

        });
    }

    public void runTask(Runnable task) {
        threadPool.execute(task);
    }

    public void shutDown() {
        threadPool.shutdownNow();
    }
    public ThreadPoolExecutor getThreadPool() {
        return threadPool;
    }

    public void setThreadPool(ThreadPoolExecutor threadPool) {
        this.threadPool = threadPool;
    }

    public static void main(String[] args) {
        MyThreadPoolExecutor mtpe = new MyThreadPoolExecutor();
        for (int i = 0; i < 1000; i++) {
            final int j = i;
            mtpe.runTask(new Runnable() {

                @Override
                public void run() {
                    System.out.println(j);
                }

            });
        }
    }
}

Попробуйте выполнить этот код несколько раз.Обычно он выводит номер на консоль, и когда все потоки заканчиваются, он существует.Но иногда он выполнил все задачи, а затем не завершается.Дамп потока выглядит следующим образом:

MyThreadPoolExecutor [Java Application] 
    MyThreadPoolExecutor at localhost:2619 (Suspended)  
        Daemon System Thread [Attach Listener] (Suspended)  
        Daemon System Thread [Signal Dispatcher] (Suspended)    
        Daemon System Thread [Finalizer] (Suspended)    
            Object.wait(long) line: not available [native method]   
            ReferenceQueue<T>.remove(long) line: not available    
            ReferenceQueue<T>.remove() line: not available    
            Finalizer$FinalizerThread.run() line: not available 
        Daemon System Thread [Reference Handler] (Suspended)    
            Object.wait(long) line: not available [native method]   
            Reference$Lock(Object).wait() line: 485 
            Reference$ReferenceHandler.run() line: not available    
        Thread [pool-1-thread-1] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-2] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-3] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-4] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-6] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-8] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-5] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-10] (Suspended)   
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-9] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-7] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [DestroyJavaVM] (Suspended)  
    C:\Program Files\Java\jre1.6.0_07\bin\javaw.exe (Jun 17, 2010 10:42:33 AM)  

В моем реальном приложении потоки ThreadPoolExecutor переходят в это состояние и перестают отвечать.

С уважением, Рави Рао

1 Ответ

9 голосов
/ 17 июня 2010

В вашем методе main вы никогда не вызовете mtpe.shutdown(). ThreadPoolExecutor будет пытаться поддерживать его corePoolSize в течение неопределенного времени.Иногда вам везет, и у вас больше, чем corePoolSize потоков, так что каждый рабочий поток перейдет в ветвь условной логики, которая позволит ему завершиться по истечении заданного времени ожидания 10 секунд.Однако, как вы заметили, иногда это не так, поэтому каждый поток в исполнителе будет блокировать ArrayBlockingQueue.take () и ждать нового задания.

Также обратите вниманиесуществует существенная разница между ExecutorService.shutdown () и ExecutorService.shutdownNow () .Если вы вызовите ExecutorService.shutdownNow (), как показывает ваша реализация оболочки, вы будете иногда отбрасывать некоторые задачи, которые не были назначены для выполнения.

Обновление: После моего первоначального ответа, ThreadPoolExecutor реализация изменилась так, что программа в исходном посте никогда не должна выходить.

...