Spring Batch - Как сделать два запроса или передать два объекта процессору или устройству записи? - PullRequest
0 голосов
/ 08 апреля 2019

Я разрабатываю Spring Boot Spring Batch код. Чтение данных из Oracle DB и загрузка всех данных в MongoDB (NOSQL DB). Моделирование MongoDB разработано как ненормализованный способ согласно стандартному способу реализации отношений / моделирования монго.

У меня есть TableA и TableB таблица и таблица соединения TableAB между ними, которая является 3-ей таблицей. Когда я читаю TableA Table через JdbcCursorItemReader<TableA> это время для каждого PK Id из TableA, мне нужно запросить к SubDivision Table, чтобы получить все SubDivision для TableA PK и заполните SubDivision Data, установите его в модель TableA. Модель TableA имеет список SubDivisions.

Единственный способ, которым я вижу, как сделать запрос из TableAProcessor и установить данные в модель TableA, его легко реализовать, но проблема в том, что он делает вызовы по 100 КБ из БД из TableAProcess, если у меня 100K TableA records .

Как я могу добиться этого и установить данные SubDivision для модели TableA с помощью Tasklet или любым другим способом?

Как избежать вызова столь большого количества запросов от Процессора?

Я не могу сделать один запрос из-за некоторых ограничений, поэтому мне нужно запросить еще один запрос к БД, чтобы получить данные SubDivision.

@Slf4j
public class TableAProcessor implements ItemProcessor<TableA, TableA>{

    @Autowired
    private TableADao tableADao;

    @Override
    public TableA process(TableA tableA) throws Exception {
        log.debug("TableA DETAILS : "+tableA);
        List<SubDivision> subDivisions = tableADao.getSubDivision(tableA.getPKId());
        tableA.setSubDivisions(subDivisions);
        return tableA;
    }
}

Модель

public class TableA {
    @Transient
    private Integer Id;
    @Field
    private String mongoId;
    ........
    .......
    @Field
    private List<SubDivision> subDivisions;
}

TableABatchConfig.java

@Configuration
public class TableABatchConfig {

    private static final String sql = "SELECT * FROM TABLEA";

    @Autowired
    @Qualifier(value="oracleDataSource")
    private DataSource dataSource;

    @Bean(destroyMethod = "")
    @StepScope
    public JdbcCursorItemReader<TableA> TableAReader() throws Exception {
        JdbcCursorItemReader<TableA> reader = new JdbcCursorItemReader<TableA>();
        reader.setDataSource(this.dataSource);
        reader.setSql(sql);

        reader.setRowMapper(new TableARowMapper());
        reader.afterPropertiesSet();
        return reader;
    }

    @Bean
    public ItemProcessor<TableA, TableA> TableAProcessor() {
        return new TableAProcessor();
    }

    @Bean
    public TableAWriter TableAWriter() {
        return new TableAWriter();
    }
}

TableAJob.java

@Configuration
@PropertySource("classpath:application.properties")
public class TableAJob {
    @Value( "${spring.chunk.size}")
    private String chunkSize;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JdbcCursorItemReader<TableA> TableAReader;

    @Autowired
    private ItemProcessor<TableA, TableA> TableAProcessor;

    @Autowired
    private TableAWriter TableAWriter;

    @Bean
    public TableAStepExecuListner TableAStepExecuListner() {
        return new TableAStepExecuListner();
    }

    @Bean("readTableAJob")
    @Primary
    public Job readTableAJob() {
        return jobBuilderFactory.get("readTableAJob")
                .incrementer(new RunIdIncrementer())
                .start(TableAStepOne())
                .build();
    }

    @Bean
    public Step TableAStepOne() {
        return stepBuilderFactory.get("TableAStepOne")
                .<TableA, TableA>chunk(Integer.parseInt(chunkSize))
                .reader(TableAReader)
                .processor(TableAProcessor)
                .writer(TableAWriter)
                .listener(TableAStepExecuListner())
                .build();
    }
}
* * Дао тысячи сорок-девять
@Service
public class TableADao {

    private static final String SQL = "COMPLEX JOIN QUERY";

    @Autowired
    private JdbcTemplate jdbcTemplate;

    public List<SubDivision> getSubDivision(Integer pkId){
        List<Map<String, Object>> results = jdbcTemplate.queryForList(SQL,new Object[] { pkId });

        List<SubDivision> divisions = new ArrayList<>();
        for (Map<String, Object> row : results) {
            divisions.add(SubDivision.builder().subDivisionCd((String)row.get("SUBDIVISION_CD"))
                    ......
                    .........
                    .........
                    ......
                    .build());
        }
        return divisions;
    }
}

TableAWriter.java

public class TableAWriter implements ItemWriter<TableA>{
    @Autowired
    private TableARepository TableARepository;

    @Override
    public void write(List<? extends TableA> items) throws Exception {
        TableARepository.saveAll(items);
    }
}
...