@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job processJob;
private ExecutorService executorService;
@Override
public AiResponseDTO executeAiRules(AiRequestDTO aiRequestDTO) throws Exception {
executorService = Executors.newFixedThreadPool(aiRequestDTO.getAiRuleTypes().size());
for(AiRuleType aiRuleType: aiRequestDTO.getAiRuleTypes()) {
if(aiRuleType!=AiRuleType.HVAC) {
System.out.println("Start : "+aiRuleType.getValue()+" : "+new Date());
try {
Future<?> submit = executorService.submit(new Runnable() {
@Override
public void run() {
try {
//add custom job parameters and start job
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.addDouble("random", Math.random())
.addParameter("customparam",
new AiJobCustomParameter(aiRequestDTO.getRequestedFile(),
aiRequestDTO.getAiEmsType(),
aiRuleType,
zipFileExtractedPath,
tmpFileStorePath)).toJobParameters();
jobLauncher.run(processJob,jobParameters);
} catch (JobExecutionAlreadyRunningException | JobRestartException
| JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
aiMap.put(aiRuleType, submit);
}
catch (Exception e) {
throw e;
}
}
}
Я пытаюсь запустить несколько экземпляров задания параллельно с многопоточностью для выполнения другой задачи. Также я использую threadPoolTaskExecuter для рабочих мест и использую MultiResourcePartitioner для разделения файлов CSV. но это не работает так, как должно быть. Каждое следующее задание ожидает завершения до sh предыдущего.
@Configuration
public class AiJobBatchConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
private AiJobCustomParameter jobParameters;
@Bean
public Job processJob() throws IOException, UnexpectedInputException, ParseException {
return jobBuilderFactory.get("processJob").incrementer(
new RunIdIncrementer()).listener(new AiJobListener())
//.flow(masterStep())
.start(masterStep())
//.end()
.build();
}
@Bean
public Step masterStep() throws IOException, UnexpectedInputException, ParseException{
return stepBuilderFactory.get("masterStep").listener(new StepExecutionListener() {
@Override
public void beforeStep(StepExecution stepExecution) {
jobParameters = (AiJobCustomParameter)stepExecution.getJobExecution().getJobParameters().getParameters().get("customparam");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
})
.partitioner(executerStep().getName(),partitioner())
.step(executerStep())
.taskExecutor(jobLauncherTaskExecutor())
.build();
}
@Bean
public Step executerStep() throws UnexpectedInputException, MalformedURLException, ParseException {
return stepBuilderFactory.get("executerStep")
.<String,Object> chunk(1000)
.reader(new AiFileReader())
.processor(new AiFileProcessor())
.writer(new AiFileWriter())
.build();
}
@Bean
@StepScope
public Partitioner partitioner() {
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
List<Resource> resources = new ArrayList<Resource>();
try {
Resource[] resources2 = resolver.getResources("file:"+jobParameters.getZipFileExtractedPath()+"*");
for(Resource resource : resources2) {
//System.out.println("R1 : "+resource.getFilename());
if(resource.getFilename().endsWith(".csv")) {
resources.add(resource);
}
//check inside the folder and fetch files
else if(!resource.getFilename().endsWith(".*")) {
Resource[] resources3 = resolver.getResources(resource.getURI()+"/*");
for(Resource resource2 : resources3) {
resources.add(resource2);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
Resource [] resources2 =resources.toArray(new Resource[resources.size()]);
if(resources2.length==0)
logger.error("NO CSV FILE FOUND");
logger.error("Resources : "+resources2.length);
partitioner.setResources(resources2);
return partitioner;
}
@Bean
public TaskExecutor jobLauncherTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(15);
return executor;
}
}
Если я использую corePoolSize, он позволяет выполнять параллельное выполнение для экземпляров задания, но не все файлы выполняются для каждого задания. Всего файлов делятся на экземпляры заданий.