Класс DeployerPartitionHandler, выбрасывающий исключение нулевого указателя для нескольких шагов удаленного раздела - PullRequest
1 голос
/ 11 марта 2020

Я использую Spring Batch Spring Cloud для автоматизации чтения / записи данных (чтение из файла и сохранение в MongoDb). В моем случае использования у меня есть 2 (добавит еще 1 шаг после успешного достижения 2) шагов. Я пытаюсь использовать удаленное разбиение интегрировать весеннюю облачную задачу DeployerPartitionHandler. java мастер-класс класса для подчиненного узла в качестве посредника, это то, что я понимаю вместо использования весенней интеграции activemg / rabbitmq. Я создал 2 компонента-обработчика и два обработчика для двух моих шагов. Ниже приведен пример кода. Я получаю исключение ниже.

2020-03-11 12:03:59 - o.s.batch.core.step.AbstractStep - Encountered an error executing step step1 in job Job669228617
java.lang.NullPointerException: null
        at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorker(DeployerPartitionHandler.java:347)
        at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorkers(DeployerPartitionHandler.java:313)
        at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.handle(DeployerPartitionHandler.java:302)
        at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:106)
        at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208)
        at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
        at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
        at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
        at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
        at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147)
        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
        at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
        at com.sun.proxy.$Proxy77.run(Unknown Source)
        at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:192)
@Configuration
@ComponentScan(basePackageClasses = DbConfig.class)
public JobConfig {

@Autowired
private TaskLauncher taskLauncher;
@Autowired
private JobExplorer jobExplorer;
@Autowired
private TaskRepository taskRepository;
@Autowired
private Reader1 reader2;
@Autowired
private Writer2 writer2;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
private DelegatingResourceLoader resourceLoader;

@Autowired
private ConfigurableApplicationContext context;

@Autowired
public JobRepository jobRepository;
@Autowired
private Environment environment;

private static final int GRID_SIZE = 2;
@Autowired
private Reader1 reader1;
@Autowired
private Writer2 writer2;
@Autowired
@Qualifier("partitionHandler1")
private PartitionHandler partitionHandler1;
@Autowired
@Qualifier("partitionHandler2")
private PartitionHandler partitionHandler2;
@Bean
@Profile("master")
public Job masterJob()  {
    Random random = new Random();
    return this.jobBuilderFactory.get("masterJob" + random.nextInt())
            .start(step1())
            .next(step2())
            .build();
}
@Bean
@Profile("master")
public Step step1()  {
        return this.stepBuilderFactory.get("step1")
            .partitioner("slaveStep1", partitioner1())
            .partitionHandler(partitionHandler1)
            .taskExecutor(taskExecutor())
            .build();
}
@Bean
@Profile("master")
public Step step2() {
    return this.stepBuilderFactory.get("step2")
            .partitioner("slaveStep2",partitioner2())
            .partitionHandler(partitionHandler2)
            .taskExecutor(taskExecutor())
            .build();
}
@Bean
@Profile("worker")
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
    return new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
}
@Bean
public Step slaveStep1() {
    return this.stepBuilderFactory.get("slaveStep1")
            .<Domain1, Domain1>chunk(50)
            .reader(reader1)
            .writer(writer1)
            .listener(stepExecutionListner())
            .build();
}
@Bean
public Step slaveStep2() {
    return this.stepBuilderFactory.get("slaveStep2")
            .<Domain2, Domain2>chunk(50)
            .reader(reader2)
            .writer(writer2)
            .listener(stepExecutionListner())
            .build();
}
@Bean
public Partitioner partitioner1() {
    FilePartitioner filePartitioner = new FilePartitioner("classpath:input/test1*.csv");
    return filePartitioner.getFilesPartitioner();
}
@Bean
public Partitioner partitioner2() {
    FilePartitioner filePartitioner = new FilePartitioner("classpath:input/test2*.csv");
    return filePartitioner.getFilesPartitioner();
}
@Bean(name="partitionHandler1")
public PartitionHandler partitionHandler1(TaskLauncher taskLauncher,
                                                   JobExplorer jobExplorer, TaskRepository taskRepository) {
    Resource resource = this.resourceLoader.getResource("maven://com.abc:test:1.0-SNAPSHOT");
    DeployerPartitionHandler partitionHandler =
            new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "ormBusUnitLoaderStep",taskRepository);
    List<String> commandLineArgs = new ArrayList<>(3);
    commandLineArgs.add("--spring.profiles.active=worker");
    commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
    commandLineArgs.add("--spring.batch.initializer.enabled=false");
    partitionHandler.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
    SimpleEnvironmentVariablesProvider environmentVariablesProvider = new SimpleEnvironmentVariablesProvider(this.environment);
    partitionHandler.setEnvironmentVariablesProvider(environmentVariablesProvider);
    partitionHandler.setMaxWorkers(3);
    partitionHandler.setApplicationName("Job");
    return partitionHandler;
}
@Bean(name="partitionHandler2")
//@Scope(value = "prototype")
public PartitionHandler partitionHandler2(TaskLauncher taskLauncher,
                                               JobExplorer jobExplorer, TaskRepository taskRepository) {
    Resource resource = this.resourceLoader.getResource("maven://com.abc:test:1.0-SNAPSHOT");
    DeployerPartitionHandler partitionHandler =
            new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "cvaRmaStep",taskRepository);
    List<String> commandLineArgs = new ArrayList<>(3);
    commandLineArgs.add("--spring.profiles.active=worker");
    commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
    commandLineArgs.add("--spring.batch.initializer.enabled=false");
    partitionHandler.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
    SimpleEnvironmentVariablesProvider environmentVariablesProvider = new SimpleEnvironmentVariablesProvider(this.environment);
    partitionHandler.setEnvironmentVariablesProvider(environmentVariablesProvider);
    partitionHandler.setMaxWorkers(3);
    partitionHandler.setApplicationName("CVAJob");
    return partitionHandler;
}
@Bean
@StepScope
public StepExecutionListner stepExecutionListner() {
    return new StepExecutionListner();
}
}

Ниже приведена конфигурация БД

@Configuration
public class DbConfig implements BatchConfigurer {
    @ConfigurationProperties(prefix = "spring.datasource")
    @Bean(name="batchDataSource")
    @Primary
    public DataSource dataSource() {
       return DataSourceBuilder.create().build();
    }

    @Override
    public JobRepository getJobRepository() throws Exception {
       JobRepositoryFactoryBean factoryBean = new JobRepositoryFactoryBean();
       factoryBean.setDatabaseType("ORACLE");
       factoryBean.setDataSource(dataSource());
       factoryBean.setTransactionManager(getTransactionManager());
       factoryBean.setIsolationLevelForCreate("ISOLATION_READ_COMMITTED");
       factoryBean.setTablePrefix("SCHEMA.BATCH_");
       return factoryBean.getObject();
    }

    @Override
    public PlatformTransactionManager getTransactionManager() throws Exception {
       return new DataSourceTransactionManager(dataSource());
   }

   @Override
   public JobLauncher getJobLauncher() throws Exception {
       SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
       jobLauncher.setJobRepository(getJobRepository());
       jobLauncher.afterPropertiesSet();
       return jobLauncher;
   }

   @Override
   public JobExplorer getJobExplorer() throws Exception {
      JobExplorerFactoryBean factory = new JobExplorerFactoryBean();
      factory.setDataSource(dataSource());
      factory.afterPropertiesSet();
      return factory.getObject();
   }

   @Bean
   public TaskConfigurer taskConfigurer(
      @Qualifier("batchDataSource")DataSource batchDataSource){
       return new DefaultTaskConfigurer(batchDataSource);
   }
   }

Как выполнить sh мой сценарий использования с помощью удаленного разбиения?

...