Пружинное пакетное задание с многопоточностью не работает - PullRequest
0 голосов
/ 26 марта 2020
@Autowired
private JobLauncher jobLauncher;

@Autowired
private Job processJob;

private ExecutorService executorService;

@Override
public AiResponseDTO executeAiRules(AiRequestDTO aiRequestDTO) throws Exception {
executorService = Executors.newFixedThreadPool(aiRequestDTO.getAiRuleTypes().size());
            for(AiRuleType aiRuleType: aiRequestDTO.getAiRuleTypes()) {
                if(aiRuleType!=AiRuleType.HVAC) {
                    System.out.println("Start : "+aiRuleType.getValue()+" : "+new Date());
                    try {
                        Future<?> submit = executorService.submit(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    //add custom job parameters and start job
                                    JobParameters jobParameters = new JobParametersBuilder()
                                            .addLong("time", System.currentTimeMillis())
                                            .addDouble("random", Math.random())
                                            .addParameter("customparam",
            new AiJobCustomParameter(aiRequestDTO.getRequestedFile(),
                                                            aiRequestDTO.getAiEmsType(),
                                                            aiRuleType,
                                                            zipFileExtractedPath,
                                                            tmpFileStorePath)).toJobParameters();
                                    jobLauncher.run(processJob,jobParameters);
                                } catch (JobExecutionAlreadyRunningException | JobRestartException
                                        | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
                                    // TODO Auto-generated catch block
                                    e.printStackTrace();
                                }
                            }
                        });
                        aiMap.put(aiRuleType, submit);
                    }
                    catch (Exception e) {
                        throw e;
                    }
                }
            }

Я пытаюсь запустить несколько экземпляров задания параллельно с многопоточностью для выполнения другой задачи. Также я использую threadPoolTaskExecuter для рабочих мест и использую MultiResourcePartitioner для разделения файлов CSV. но это не работает так, как должно быть. Каждое следующее задание ожидает завершения до sh предыдущего.

@Configuration
public class AiJobBatchConfig {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
public StepBuilderFactory stepBuilderFactory;

private AiJobCustomParameter jobParameters;

@Bean
public Job processJob() throws IOException, UnexpectedInputException, ParseException {
    return jobBuilderFactory.get("processJob").incrementer(
new RunIdIncrementer()).listener(new AiJobListener())
            //.flow(masterStep())
            .start(masterStep())
            //.end()
            .build();
}

@Bean
public Step masterStep() throws IOException, UnexpectedInputException, ParseException{
    return stepBuilderFactory.get("masterStep").listener(new StepExecutionListener() {

        @Override
        public void beforeStep(StepExecution stepExecution) {
            jobParameters = (AiJobCustomParameter)stepExecution.getJobExecution().getJobParameters().getParameters().get("customparam");
        }

        @Override
        public ExitStatus afterStep(StepExecution stepExecution) {
            return null;
        }
    })
            .partitioner(executerStep().getName(),partitioner())
            .step(executerStep())
            .taskExecutor(jobLauncherTaskExecutor())
            .build();
}

@Bean
public Step executerStep() throws UnexpectedInputException, MalformedURLException, ParseException {
    return stepBuilderFactory.get("executerStep")
            .<String,Object> chunk(1000)
            .reader(new AiFileReader())
            .processor(new AiFileProcessor())
            .writer(new AiFileWriter())
            .build();
}

@Bean
@StepScope
public Partitioner partitioner() {
    MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
    ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
    List<Resource> resources = new ArrayList<Resource>();
    try {
        Resource[] resources2 = resolver.getResources("file:"+jobParameters.getZipFileExtractedPath()+"*");
        for(Resource resource : resources2) {
            //System.out.println("R1 : "+resource.getFilename());
            if(resource.getFilename().endsWith(".csv")) {
                resources.add(resource);
            }
            //check inside the folder and fetch files
            else if(!resource.getFilename().endsWith(".*")) {
                Resource[] resources3 = resolver.getResources(resource.getURI()+"/*");
                for(Resource resource2 : resources3) {
                        resources.add(resource2);
                }
            }
        }

    } catch (IOException e) {
        e.printStackTrace();
    }

    Resource [] resources2 =resources.toArray(new Resource[resources.size()]);
    if(resources2.length==0)
        logger.error("NO CSV FILE FOUND");
    logger.error("Resources : "+resources2.length);
    partitioner.setResources(resources2);
    return partitioner;
}

@Bean
public TaskExecutor jobLauncherTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setMaxPoolSize(15);
    return executor;
}
}

Если я использую corePoolSize, он позволяет выполнять параллельное выполнение для экземпляров задания, но не все файлы выполняются для каждого задания. Всего файлов делятся на экземпляры заданий.

1 Ответ

0 голосов
/ 26 марта 2020

По умолчанию Spring Batch использует SyncTaskExecutor для средства запуска заданий, которое является однопоточным. Вы не поделились конфигурацией своего модуля запуска заданий (поэтому, я думаю, используется используемый по умолчанию). Необходимо убедиться, что для вашего JobLauncher используется асинхронный исполнитель задач, например SimpleAsyncTaskExecutor (который подходит для целей тестирования, но не подходит для работы, так как не использует потоки повторно) или ThreadPoolTaskExecutor. Вот пример:

// this is your current task executor
@Bean
public TaskExecutor jobLauncherTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setMaxPoolSize(15);
    return executor;
}

@Bean
public JobLauncher jobLauncher(JobRepository jobRepository) throws Exception {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(jobLauncherTaskExecutor());
    jobLauncher.afterPropertiesSet();
    return jobLauncher;
}

Более подробную информацию можно найти в разделе Настройка JobLauncher справочной документации.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...