`ThreadPoolTaskExecutor` Потоки не уничтожаются после выполнения в Spring - PullRequest
0 голосов
/ 02 ноября 2018

Я пытаюсь изменить Кварцевое последовательное выполнение на Параллельное выполнение.

Работает нормально, с точки зрения производительности, это кажется хорошим, но порожденные (созданные) потоки не уничтожаются.

Это все еще в Runnable состоянии; почему и как я могу это исправить? Пожалуйста, ведите меня.

enter image description here

Код здесь:

    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        logger.error("Result Processing executed");
        List<Object[]> lstOfExams = examService.getExamEntriesForProcessingResults();
        String timeZone = messageService.getMessage("org.default_timezone", null, Locale.getDefault());
        if(lstOfExams!=null&&!lstOfExams.isEmpty()){
            ThreadPoolTaskExecutor threadPoolExecuter = new ThreadPoolTaskExecutor();
            threadPoolExecuter.setCorePoolSize(lstOfExams.size());
            threadPoolExecuter.setMaxPoolSize(lstOfExams.size()+1);
            threadPoolExecuter.setBeanName("ThreadPoolTaskExecutor");
            threadPoolExecuter.setQueueCapacity(100);
            threadPoolExecuter.setThreadNamePrefix("ThreadForUpdateExamResult");
            threadPoolExecuter.initialize();

            for(Object[] obj : lstOfExams){
                if(StringUtils.isNotBlank((String)obj[2]) ){
                    timeZone = obj[2].toString();
                }
                try {
                    Userexams userexams=examService.findUserExamById(Long.valueOf(obj[0].toString()));
                    if(userexams.getExamresult()==null){
                        UpdateUserExamDataThread task=new UpdateUserExamDataThread(obj,timeZone);
                        threadPoolExecuter.submit(task);
                    }
//                  testEvaluator.generateTestResultAsPerEvaluator(Long.valueOf(obj[0].toString()), obj[4].toString(), obj[3]==null?null:obj[3].toString(),timeZone ,obj[5].toString() ,obj[1].toString()); 
//                  logger.error("Percentage Marks:::::"+result.getPercentageCatScore());
                } catch (Exception e) {
                    Log.error("Exception at ResultProcessingJob extends QuartzJobBean executeInternal(JobExecutionContext context) throws JobExecutionException",e);
                    continue;
                }

            }
            threadPoolExecuter.shutdown();
        }
}

UpdateUserExamDataThread .class

@Component
//@Scope(value="prototype", proxyMode=ScopedProxyMode.TARGET_CLASS)
//public class UpdateUserExamDataThread extends ThreadLocal<String> //implements Runnable {
public class UpdateUserExamDataThread implements Runnable {
    private Logger log = Logger.getLogger(UpdateUserExamDataThread.class);
    @Autowired
    ExamService examService;
    @Autowired
    TestEvaluator testEvaluator;
    private Object[] obj;
    private String timeZone;


    public UpdateUserExamDataThread(Object[] obj,String timeZone) {
        super();
        this.obj = obj;
        this.timeZone = timeZone;
    }

    @Override
    public void run() {
        String threadName=String.valueOf(obj[0]);
        log.info("UpdateUserExamDataThread Start For:::::"+threadName);
        testEvaluator.generateTestResultAsPerEvaluator(Long.valueOf(obj[0].toString()), obj[4].toString(), obj[3]==null?null:obj[3].toString(),timeZone ,obj[5].toString() ,obj[1].toString());
        //update examResult
        log.info("UpdateUserExamDataThread End For:::::"+threadName);
    }

}

TestEvaluatorImpl.java

@Override
    @Transactional
    public Examresult generateTestResultAsPerEvaluator(Long userExamId, String evaluatorType, String codingLanguage,String timeZoneFollowed ,String inctenceId ,String userId) {
        dbSchema = messageService.getMessage("database.default_schema", null, Locale.getDefault());

        try {
//Some Methods
return examResult;
}catch(Exception e){
log.erorr(e);
}
}

При необходимости могу предоставить файл дампа нити.

Ответы [ 5 ]

0 голосов
/ 05 декабря 2018

Просто необходимо увеличить приоритет потоков и создать количество потоков в соответствии с количеством ядер в процессоре.

protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        logger.error("Result Processing executed");
        List<Object[]> lstOfExams = examService.getExamEntriesForProcessingResults();
        String timeZone = messageService.getMessage("org.default_timezone", null, Locale.getDefault());
        int cores = Runtime.getRuntime().availableProcessors();
        if(lstOfExams!=null&&!lstOfExams.isEmpty()){
            ThreadPoolTaskExecutor threadPoolExecuter = new ThreadPoolTaskExecutor();
            threadPoolExecuter.setCorePoolSize(cores);
//          threadPoolExecuter.setMaxPoolSize(Integer.MAX_VALUE);
            threadPoolExecuter.setBeanName("ThreadPoolTaskExecutor");
//          threadPoolExecuter.setQueueCapacity(Integer.MAX_VALUE);
            threadPoolExecuter.setQueueCapacity(lstOfExams.size()+10);
            threadPoolExecuter.setThreadNamePrefix("ThreadForUpdateExamResult");
            threadPoolExecuter.setWaitForTasksToCompleteOnShutdown(true);
            threadPoolExecuter.setThreadPriority(10);
            threadPoolExecuter.initialize();


            for(Object[] obj : lstOfExams){
                if(StringUtils.isNotBlank((String)obj[2]) ){
                    timeZone = obj[2].toString();
                }
                try {
                    Userexams userexam=examService.findUserExamById(Long.valueOf(obj[0].toString()));
                    if(userexam.getExamresult()==null){
                        UpdateUserExamDataThread task=new UpdateUserExamDataThread(obj,timeZone,testEvaluator);
//                      threadPoolExecuter.submit(task);
                        threadPoolExecuter.execute(task);
                    }
//                  testEvaluator.generateTestResultAsPerEvaluator(Long.valueOf(obj[0].toString()), obj[4].toString(), obj[3]==null?null:obj[3].toString(),timeZone ,obj[5].toString() ,obj[1].toString()); 
//                  logger.error("Percentage Marks:::::"+result.getPercentageCatScore());
                } catch (Exception e) {
                    logger.error("Exception at ResultProcessingJob extends QuartzJobBean executeInternal(JobExecutionContext context) throws JobExecutionException",e);
                    continue;
                }
            }
                threadPoolExecuter.shutdown();
        }
}
0 голосов
/ 13 ноября 2018

Я подозреваю, что проблема заключается просто в том, что вы вызываете run () вместо execute () при порождении потока задач с помощью submit (). Вероятно, при использовании submit существует некоторое ожидание, что потоки убивают себя, когда задача завершается, а не завершается в конце метода run.

0 голосов
/ 08 ноября 2018

кажется, что вы создаете пул потоков для экзаменов того же размера, что не совсем оптимально.

    // Core pool size is = number of exams  
    threadPoolExecuter.setCorePoolSize(lstOfExams.size());

    // Max pool size is just 1 + exam size. 
    threadPoolExecuter.setMaxPoolSize(lstOfExams.size()+1);

Вы должны учитывать, что: - Если вы создали пул потоков и запустили его, сразу началось столько потоков, сколько указано в размере ядра.

  • Максимальный размер пула эффективен только тогда, когда вы отправляете больше, чем потоки основного пула могут обрабатывать прямо сейчас, И когда размер очереди заполнен (в данном случае 100). Это означает, что новый поток будет создан только тогда, когда количество отправленных заданий превысит 100 + размер экзамена.

В вашем случае я бы установил размер пула ядер 5 или 10 (на самом деле это зависит от того, сколько ядер у вашего целевого ЦП и / или насколько ограничен ввод / вывод представленных задач).

Максимальный размер пула может быть вдвое больше, но он не действует, пока очередь не заполнится.

Чтобы размер живых потоков уменьшился после выполненной работы, вам нужно установить 2 параметра.

  • setKeepAliveSeconds (int keepAliveSeconds) : которые позволяют автоматически отключать потоки, если они не используются в течение определенных секунд (по умолчанию 60 секунд, что является оптимальным), НО это обычно используется только закрыть потоки неосновных потоков пула.

  • Чтобы закрыть потоки основной части после keepAliveSeconds, вы должны установить setAllowCoreThreadTimeOut (boolean allowCoreThreadTimeOut) как true. Что обычно неверно, чтобы поддерживать основной пул до тех пор, пока приложение работает.

Надеюсь, это поможет.

0 голосов
/ 13 ноября 2018

Потоки не ждут ввода-вывода с какого-либо удаленного сервера, потому что выполняемый метод в потоках будет в некоторых классах драйверов jdbc, но все они в настоящее время находятся в UpdateUserExamDataThread.run (), строка 37.

Теперь вопрос: что за код в UpdateUserExamDataThread.java, строка 37? К сожалению, данный UpdateUserExamDataThread.java является неполным, и / или не соответствует действительности версии: объявление пакета отсутствует и заканчивается строкой 29.

0 голосов
/ 06 ноября 2018

Я подозреваю, что один из ваших потоков бесконечно ждет ответа на запрос ввода-вывода. Например, вы пытаетесь подключиться к удаленному хосту, на котором вы не установили тайм-аут соединения, и хост не отвечает. В этом случае вы можете принудительно завершить все выполняющиеся задачи, запустив метод shutdownNow базового ExecutorService, после чего вы можете проанализировать InterruptedIOException , выдаваемый вызывающими потоками.

Заменить

threadPoolExecuter.shutdown();

ниже, чтобы вы могли проверить ошибки.

ExecutorService executorService = threadPoolExecuter.getThreadPoolExecutor();
executorService.shutdownNow();

Это отправит прерывание сигнал всем работающим потокам.

...