Потоки ThreadPoolExecutor не работают одновременно с использованием PriorityBlockingQueue - PullRequest
0 голосов
/ 02 июля 2018

Я использую java ThreadPoolExecutor для запуска параллельного выполнения потока. Я использовал ArrayBlockingQueue, чтобы держать потоки в очереди. Но теперь требование изменилось, и мне нужно добавить время выполнения потока (без ограничения размера), и оно должно быть приоритетным. Поэтому я решил использовать PriorityBlockingQueue вместо ArrayBlockingQueue с некоторой логикой сравнения. После использования PriorityBlockingQueue потоки работают последовательно один за другим, а не одновременно. Одновременно выполняется только один поток, а не то, каким будет число активных потоков. Пожалуйста, дайте мне знать, если у кого-нибудь есть какие-либо предложения, чтобы решить эту проблему и выполнить мое требование (поток должен быть добавлен в пул во время выполнения, и его выполнение должно основываться на приоритете).

Мой демонстрационный код:

//RejectedExecutionHandler implementation
    RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
    //Get the ThreadFactory implementation to use
    BlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>(50, ThreadComparator.getComparator());
    ThreadPoolExecutor executorPool = new ThreadPoolExecutor(1, activeThread, 10, TimeUnit.SECONDS, queue, threadFactory, rejectionHandler);
    //start the monitoring thread
    MyMonitorThread monitor = new MyMonitorThread(executorPool, 20, "Demo");
    Thread monitorThread = new Thread(monitor);
    monitorThread.start();

    for (int i = 0; i < totalThead; i++) {
        int prio = i % 3 == 0 ? 3 : 5;
        executorPool.execute(new MyThread("Thread-" + i, prio));        
    }

    // Inserting more threads in between concurrent execution.
    try {
        Thread.sleep(40000);
        for (int j = 101; j < 110; j++) {
            executorPool.execute(new MyThread("Thread-" + j, 2));
        }
    } catch (InterruptedException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    }


    while(executorPool.getActiveCount() != 0) {
        try {
            Thread.sleep(10000); 
        } catch (InterruptedException e) {
            System.out.println("Error while thread sleeping: " + e);
        }
    }
    //shut down the pool
    executorPool.shutdown();
    //shut down the monitor thread
    try {
        Thread.sleep(5000); 
    } catch (InterruptedException e) {
        System.out.println("Error while thread sleeping: " + e);
    }
    monitor.shutdown();

 public abstract class ThreadComparator implements Comparator<Runnable>{

public static Comparator<Runnable> getComparator() {
    return new Comparator<Runnable>() {
        @Override
        public int compare(Runnable t1, Runnable t2) {
            CompareToBuilder compare = new CompareToBuilder();
            MyThread mt1 = (MyThread) t1;
            MyThread mt2 = (MyThread) t2;
            compare.append(mt1.getPriority(), mt2.getPriority());
            return compare.toComparison();
        }
    };
}

}

1 Ответ

0 голосов
/ 02 июля 2018

Это ожидаемое поведение ThreadPoolExecutor с неограниченной рабочей очередью.

Для цитирования ThreadPoolExecutor JavaDoc :

Ядро и максимальные размеры пула
ThreadPoolExecutor автоматически настроит размер пула [..]. Когда новая задача передается в метод execute (Runnable), и меньше чем выполняются потоки corePoolSize, создается новый поток для обрабатывать запрос, даже если другие рабочие потоки простаивают. Если там * больше чем corePoolSize, но меньше чем MaximumPoolSize потоки работает, новый поток будет создан, только если очередь заполнена . [...]

Поскольку вы определяете corePoolSize как 1, а PriorityBlockingQueue по сути является неограниченной очередью (которая никогда не может заполниться), у вас никогда не будет более одного потока.

Исправление - отрегулировать corePoolSize до необходимого количества потоков.

...