Я использую ThreadPoolExecutor с LinkedBlockingQueue (Integer.MAX_VALUE) для нескольких задач, но почему он вызывает RejectedExecutionException при отправке (Callable задача) в 2000 задач в веб-сфере? Разве очередь не должна содержать 2,1 миллиарда задач теоретически? Любая информация будет оценена.
Два запроса мыла, отправленные в мое приложение, будут выполнять две разные задачи, каждая из этих двух задач обрабатывается классом задания, и будет экземпляр настраиваемого ServiceExecutionFactory (bean-компонент с прототипом), который действует как фабрика потребителя и производителя. Фабрика имеет поле workQueue (размер 1000), в котором содержатся задачи, созданные классом заданий, и потребитель возьмет задачу в workQueue и поместит задачу в пул потоков. Кстати, я не могу воспроизвести это с Tomcat.
экземпляр ThreadPoolExecutor
BlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<Runnable>();
ThreadPoolExecutor tpe = (new ThreadPoolExecutor(poolSize, poolSize, 10, TimeUnit.SECONDS, executorQueue, tf));
tpe.allowCoreThreadTimeOut(true);
executorService.submit (serviceInfo.getCallable ()) может вызвать RejectedExecutionException, поскольку все потоки заняты (включая внешние потоки), а очередь заполнена (я не знаю, почему это происходит, как я упоминал выше). Затем программа поймает исключение и попытается добавить его обратно в очередь, в этот момент workQueue (размер 1000) ServiceExecutionFactoryBase может быть полностью заполнен и не сможет добавить его, она выдаст исключение полной очереди с помощью add () метод. Еще одна вещь, которую я чувствую также странным, что ConsumerService будет переработан через 3 часа, но этот поток ConsumerService должен остановиться из-за оператора throw, не так ли?
public class ConsumerService implements Callable<Object> {
public Object call() throws Exception {
// loop until interrupted
try {
while (true) {
// check the work queue for available item
ServiceInfo serviceInfo = queue.take();
// When an item is available, feed it to the executor, and save the future
try {
Future<Object> future = executorService.submit(serviceInfo.getCallable());
serviceInfo.setFuture(future);
serviceInfo.setCallable(null);
} catch (Exception e) {
if (serviceInfo.getRetrys() < getMaximumRequestRetries()) {
serviceInfo.setRetrys(serviceInfo.getRetrys() + 1);
queue.add(serviceInfo);
} else {
ServiceCaller sc = serviceInfo.getCallable();
sc.factory.notifyServiceAborted(sc.serviceKey, e);
}
}
}
} catch (InterruptedException e) {
// the job is done
return null;
} catch (Exception e) {
getLogger().error(e);
throw e;
}
}
}
Я ожидаю, что RejectExecutionException не возникает, но это происходит.