RejectedExecutionException возникает при использовании LinkedBlockingQueue (Integer.MAX_VALUE) в ThreadPoolExecutor - PullRequest
0 голосов
/ 28 марта 2019

Я использую ThreadPoolExecutor с LinkedBlockingQueue (Integer.MAX_VALUE) для нескольких задач, но почему он вызывает RejectedExecutionException при отправке (Callable задача) в 2000 задач в веб-сфере? Разве очередь не должна содержать 2,1 миллиарда задач теоретически? Любая информация будет оценена.

Два запроса мыла, отправленные в мое приложение, будут выполнять две разные задачи, каждая из этих двух задач обрабатывается классом задания, и будет экземпляр настраиваемого ServiceExecutionFactory (bean-компонент с прототипом), который действует как фабрика потребителя и производителя. Фабрика имеет поле workQueue (размер 1000), в котором содержатся задачи, созданные классом заданий, и потребитель возьмет задачу в workQueue и поместит задачу в пул потоков. Кстати, я не могу воспроизвести это с Tomcat.

экземпляр ThreadPoolExecutor

BlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<Runnable>();
ThreadPoolExecutor tpe = (new ThreadPoolExecutor(poolSize, poolSize, 10, TimeUnit.SECONDS, executorQueue, tf));
tpe.allowCoreThreadTimeOut(true);

executorService.submit (serviceInfo.getCallable ()) может вызвать RejectedExecutionException, поскольку все потоки заняты (включая внешние потоки), а очередь заполнена (я не знаю, почему это происходит, как я упоминал выше). Затем программа поймает исключение и попытается добавить его обратно в очередь, в этот момент workQueue (размер 1000) ServiceExecutionFactoryBase может быть полностью заполнен и не сможет добавить его, она выдаст исключение полной очереди с помощью add () метод. Еще одна вещь, которую я чувствую также странным, что ConsumerService будет переработан через 3 часа, но этот поток ConsumerService должен остановиться из-за оператора throw, не так ли?

public class ConsumerService implements Callable<Object> {

       public Object call() throws Exception {

           // loop until interrupted
           try {
               while (true) {

                   // check the work queue for available item
                   ServiceInfo serviceInfo = queue.take();
                   // When an item is available, feed it to the executor, and save the future
                   try {
                       Future<Object> future = executorService.submit(serviceInfo.getCallable());
                       serviceInfo.setFuture(future);
                       serviceInfo.setCallable(null);
                   } catch (Exception e) {
                       if (serviceInfo.getRetrys() < getMaximumRequestRetries()) {
                           serviceInfo.setRetrys(serviceInfo.getRetrys() + 1);
                           queue.add(serviceInfo);
                       } else {
                           ServiceCaller sc = serviceInfo.getCallable();
                           sc.factory.notifyServiceAborted(sc.serviceKey, e);
                       }
                   }
               }
           } catch (InterruptedException e) {
               // the job is done
               return null;
           } catch (Exception e) {
               getLogger().error(e);
               throw e;
           }
       }

   }

Я ожидаю, что RejectExecutionException не возникает, но это происходит.

1 Ответ

0 голосов
/ 28 марта 2019

Используемые вами классы java.util.concurrent.ThreadPoolExecutor и java.util.concurrent.LinkedBlockingQueue предоставляются JDK и не должны иметь никакого отношения к тому, какой сервер приложений вы используете.Используете ли вы один и тот же JDK в обоих случаях?В этом случае другой возможной причиной может быть более высокое потребление памяти в одной из сред, так что ThreadPoolExecutor не может выделить новый поток и отклоняет запрос submit на этом основании.Даже если для corePoolSize установлено значение, равное MaximumPoolSize, вы также устанавливаете allowCoreThreadTimeOut, что позволяет отбрасывать количество потоков и требует создания новых потоков.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...