SCDF - создание таблиц метаданных в другой схеме и работа пакетного задания с другой схемой - PullRequest
1 голос
/ 04 мая 2020

Я использую Spring Batch, который загружает данные из Oracle и помещает их в MongoDB. Я хочу использовать Spring Cloud Data Flow , но SCDF не поддерживает MongoDB.

. Есть ли способ сохранить * 1010? * Метаданные в Postgres (так как он имеет хорошую поддержку) или H2, в этом примере я использовал H2 и запускаю фактические пакетные задания, которые загружают данные с Oracle до MongoDB.

Когда я попытался запустить эту комбинацию, я получаю ошибку ниже, конечно, некоторые изменения конфигурации должны быть твик. Любой быстрый обходной путь?

Ошибка:

  org.springframework.batch.item.ItemStreamException: Failed to initialize the reader
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:152) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader$$FastClassBySpringCGLIB$$ebb633d0.invoke(<generated>) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.batch.item.database.JdbcCursorItemReader$$EnhancerBySpringCGLIB$$3c62d122.open(<generated>) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:103) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:311) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:399) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:313) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:144) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_171]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_171]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at com.sun.proxy.$Proxy66.run(Unknown Source) [na:na]
    at com.mastercard.customer.data.management.refdata.ReferenceMongoBatchApplication.run(ReferenceMongoBatchApplication.java:63) [classes!/:1.0]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:813) [spring-boot-2.1.3.RELEASE.jar!/:2.1.3.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:797) [spring-boot-2.1.3.RELEASE.jar!/:2.1.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:324) [spring-boot-2.1.3.RELEASE.jar!/:2.1.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.3.RELEASE.jar!/:2.1.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.3.RELEASE.jar!/:2.1.3.RELEASE]
    at com.mastercard.customer.data.management.refdata.ReferenceMongoBatchApplication.main(ReferenceMongoBatchApplication.java:51) [classes!/:1.0]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_171]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_171]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
    at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48) [reference-mongo-batch-1.0.jar:1.0]
    at org.springframework.boot.loader.Launcher.launch(Launcher.java:87) [reference-mongo-batch-1.0.jar:1.0]
    at org.springframework.boot.loader.Launcher.launch(Launcher.java:50) [reference-mongo-batch-1.0.jar:1.0]
    at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:51) [reference-mongo-batch-1.0.jar:1.0]

application.properties

# ORACLE DATASOURCE
spring.datasource.url=jdbc:oracle:thin:@//XXXXXX:1527/XXXX
spring.datasource.username=user
spring.datasource.password=password
spring.datasource.driver-class-name=oracle.jdbc.OracleDriver
spring.datasource.validation-query= select 1
#spring.datasource.hikari.validationTimeout=30000

spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=mgdb_drefdata_99

Конфиг

@Configuration
public class EmployeeJob {
    @Value( "${spring.chunk.size}")
    private String chunkSize;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JdbcCursorItemReader<Employee> EmployeeReader;

    @Autowired
    public AsyncItemProcessor<Employee, Employee> asyncItemProcessor;

    @Autowired
    public AsyncItemWriter<Employee> asyncItemWriter;

    @Bean
    public EmployeeStepExecuListner EmployeeStepExecuListner() {
        return new EmployeeStepExecuListner();
    }

    @Bean("readEmployeeJob")
    @Primary
    public Job readEmployeeJob() {
        return jobBuilderFactory.get("readEmployeeJob")
                .incrementer(new RunIdIncrementer())
                .start(EmployeeStepOne())
                .build();
    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    @Bean
    public Step EmployeeStepOne() {
        return stepBuilderFactory.get("EmployeeStepOne")
                .<Employee, Employee>chunk(Integer.parseInt(chunkSize))
                .reader(EmployeeReader)
                .processor((ItemProcessor) asyncItemProcessor)
                .writer(asyncItemWriter)
                .build();
    }
}

Конфиг

@Configuration
public class EmployeeBatchConfig {

    @Autowired
    @Qualifier(value="oracleDS")
    private DataSource dataSource;

    @Bean(destroyMethod = "")
    @StepScope
    public JdbcCursorItemReader<Employee> EmployeeReader() throws Exception {
        JdbcCursorItemReader<Employee> reader = new JdbcCursorItemReader<>();
        reader.setDataSource(this.dataSource);
        reader.setSql(SELECT_Employee_SQL);

        reader.setRowMapper(new EmployeeRowMapper());
        reader.afterPropertiesSet();
        return reader;
    }

    @Bean
    public ItemProcessor<Employee, Employee> EmployeeProcessor() {
        return new EmployeeProcessor();
    }

    @Bean
    public AsyncItemProcessor<Employee, Employee> asyncItemProcessor() throws Exception{
        AsyncItemProcessor<Employee, Employee> asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(EmployeeProcessor());
        asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
        asyncItemProcessor.afterPropertiesSet();
        return asyncItemProcessor;
    }

    @Bean
    public EmployeeWriter EmployeeWriter() {
        return new EmployeeWriter();
    }


    @Bean
    public AsyncItemWriter<Employee> asyncItemWriter() throws Exception{
        AsyncItemWriter<Employee> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(EmployeeWriter());
        asyncItemWriter.afterPropertiesSet();
        return asyncItemWriter;  
    }
}
...