Я использую Spring Batch Spring Cloud для автоматизации чтения / записи данных (чтение из файла и сохранение в MongoDb). В моем случае использования у меня есть 2 (добавит еще 1 шаг после успешного достижения 2) шагов. Я пытаюсь использовать удаленное разбиение интегрировать весеннюю облачную задачу DeployerPartitionHandler. java мастер-класс класса для подчиненного узла в качестве посредника, это то, что я понимаю вместо использования весенней интеграции activemg / rabbitmq. Я создал 2 компонента-обработчика и два обработчика для двух моих шагов. Ниже приведен пример кода. Я получаю исключение ниже.
2020-03-11 12:03:59 - o.s.batch.core.step.AbstractStep - Encountered an error executing step step1 in job Job669228617
java.lang.NullPointerException: null
at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorker(DeployerPartitionHandler.java:347)
at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorkers(DeployerPartitionHandler.java:313)
at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.handle(DeployerPartitionHandler.java:302)
at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:106)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
at com.sun.proxy.$Proxy77.run(Unknown Source)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:192)
@Configuration
@ComponentScan(basePackageClasses = DbConfig.class)
public JobConfig {
@Autowired
private TaskLauncher taskLauncher;
@Autowired
private JobExplorer jobExplorer;
@Autowired
private TaskRepository taskRepository;
@Autowired
private Reader1 reader2;
@Autowired
private Writer2 writer2;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DelegatingResourceLoader resourceLoader;
@Autowired
private ConfigurableApplicationContext context;
@Autowired
public JobRepository jobRepository;
@Autowired
private Environment environment;
private static final int GRID_SIZE = 2;
@Autowired
private Reader1 reader1;
@Autowired
private Writer2 writer2;
@Autowired
@Qualifier("partitionHandler1")
private PartitionHandler partitionHandler1;
@Autowired
@Qualifier("partitionHandler2")
private PartitionHandler partitionHandler2;
@Bean
@Profile("master")
public Job masterJob() {
Random random = new Random();
return this.jobBuilderFactory.get("masterJob" + random.nextInt())
.start(step1())
.next(step2())
.build();
}
@Bean
@Profile("master")
public Step step1() {
return this.stepBuilderFactory.get("step1")
.partitioner("slaveStep1", partitioner1())
.partitionHandler(partitionHandler1)
.taskExecutor(taskExecutor())
.build();
}
@Bean
@Profile("master")
public Step step2() {
return this.stepBuilderFactory.get("step2")
.partitioner("slaveStep2",partitioner2())
.partitionHandler(partitionHandler2)
.taskExecutor(taskExecutor())
.build();
}
@Bean
@Profile("worker")
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
return new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
}
@Bean
public Step slaveStep1() {
return this.stepBuilderFactory.get("slaveStep1")
.<Domain1, Domain1>chunk(50)
.reader(reader1)
.writer(writer1)
.listener(stepExecutionListner())
.build();
}
@Bean
public Step slaveStep2() {
return this.stepBuilderFactory.get("slaveStep2")
.<Domain2, Domain2>chunk(50)
.reader(reader2)
.writer(writer2)
.listener(stepExecutionListner())
.build();
}
@Bean
public Partitioner partitioner1() {
FilePartitioner filePartitioner = new FilePartitioner("classpath:input/test1*.csv");
return filePartitioner.getFilesPartitioner();
}
@Bean
public Partitioner partitioner2() {
FilePartitioner filePartitioner = new FilePartitioner("classpath:input/test2*.csv");
return filePartitioner.getFilesPartitioner();
}
@Bean(name="partitionHandler1")
public PartitionHandler partitionHandler1(TaskLauncher taskLauncher,
JobExplorer jobExplorer, TaskRepository taskRepository) {
Resource resource = this.resourceLoader.getResource("maven://com.abc:test:1.0-SNAPSHOT");
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "ormBusUnitLoaderStep",taskRepository);
List<String> commandLineArgs = new ArrayList<>(3);
commandLineArgs.add("--spring.profiles.active=worker");
commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
commandLineArgs.add("--spring.batch.initializer.enabled=false");
partitionHandler.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
SimpleEnvironmentVariablesProvider environmentVariablesProvider = new SimpleEnvironmentVariablesProvider(this.environment);
partitionHandler.setEnvironmentVariablesProvider(environmentVariablesProvider);
partitionHandler.setMaxWorkers(3);
partitionHandler.setApplicationName("Job");
return partitionHandler;
}
@Bean(name="partitionHandler2")
//@Scope(value = "prototype")
public PartitionHandler partitionHandler2(TaskLauncher taskLauncher,
JobExplorer jobExplorer, TaskRepository taskRepository) {
Resource resource = this.resourceLoader.getResource("maven://com.abc:test:1.0-SNAPSHOT");
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "cvaRmaStep",taskRepository);
List<String> commandLineArgs = new ArrayList<>(3);
commandLineArgs.add("--spring.profiles.active=worker");
commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
commandLineArgs.add("--spring.batch.initializer.enabled=false");
partitionHandler.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
SimpleEnvironmentVariablesProvider environmentVariablesProvider = new SimpleEnvironmentVariablesProvider(this.environment);
partitionHandler.setEnvironmentVariablesProvider(environmentVariablesProvider);
partitionHandler.setMaxWorkers(3);
partitionHandler.setApplicationName("CVAJob");
return partitionHandler;
}
@Bean
@StepScope
public StepExecutionListner stepExecutionListner() {
return new StepExecutionListner();
}
}
Ниже приведена конфигурация БД
@Configuration
public class DbConfig implements BatchConfigurer {
@ConfigurationProperties(prefix = "spring.datasource")
@Bean(name="batchDataSource")
@Primary
public DataSource dataSource() {
return DataSourceBuilder.create().build();
}
@Override
public JobRepository getJobRepository() throws Exception {
JobRepositoryFactoryBean factoryBean = new JobRepositoryFactoryBean();
factoryBean.setDatabaseType("ORACLE");
factoryBean.setDataSource(dataSource());
factoryBean.setTransactionManager(getTransactionManager());
factoryBean.setIsolationLevelForCreate("ISOLATION_READ_COMMITTED");
factoryBean.setTablePrefix("SCHEMA.BATCH_");
return factoryBean.getObject();
}
@Override
public PlatformTransactionManager getTransactionManager() throws Exception {
return new DataSourceTransactionManager(dataSource());
}
@Override
public JobLauncher getJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Override
public JobExplorer getJobExplorer() throws Exception {
JobExplorerFactoryBean factory = new JobExplorerFactoryBean();
factory.setDataSource(dataSource());
factory.afterPropertiesSet();
return factory.getObject();
}
@Bean
public TaskConfigurer taskConfigurer(
@Qualifier("batchDataSource")DataSource batchDataSource){
return new DefaultTaskConfigurer(batchDataSource);
}
}
Как выполнить sh мой сценарий использования с помощью удаленного разбиения?