Я разрабатываю Spring Boot Spring Batch код. Чтение данных из Oracle DB
и загрузка всех данных в MongoDB (NOSQL DB)
. Моделирование MongoDB
разработано как ненормализованный способ согласно стандартному способу реализации отношений / моделирования монго.
У меня есть TableA
и TableB
таблица и таблица соединения TableAB
между ними, которая является 3-ей таблицей. Когда я читаю TableA
Table через JdbcCursorItemReader<TableA>
это время для каждого PK Id из TableA
, мне нужно запросить к SubDivision
Table, чтобы получить все SubDivision
для TableA
PK и заполните SubDivision Data, установите его в модель TableA. Модель TableA имеет список SubDivisions
.
Единственный способ, которым я вижу, как сделать запрос из TableAProcessor
и установить данные в модель TableA
, его легко реализовать, но проблема в том, что он делает вызовы по 100 КБ из БД из TableAProcess, если у меня 100K TableA records .
Как я могу добиться этого и установить данные SubDivision
для модели TableA
с помощью Tasklet или любым другим способом?
Как избежать вызова столь большого количества запросов от Процессора?
Я не могу сделать один запрос из-за некоторых ограничений, поэтому мне нужно запросить еще один запрос к БД, чтобы получить данные SubDivision.
@Slf4j
public class TableAProcessor implements ItemProcessor<TableA, TableA>{
@Autowired
private TableADao tableADao;
@Override
public TableA process(TableA tableA) throws Exception {
log.debug("TableA DETAILS : "+tableA);
List<SubDivision> subDivisions = tableADao.getSubDivision(tableA.getPKId());
tableA.setSubDivisions(subDivisions);
return tableA;
}
}
Модель
public class TableA {
@Transient
private Integer Id;
@Field
private String mongoId;
........
.......
@Field
private List<SubDivision> subDivisions;
}
TableABatchConfig.java
@Configuration
public class TableABatchConfig {
private static final String sql = "SELECT * FROM TABLEA";
@Autowired
@Qualifier(value="oracleDataSource")
private DataSource dataSource;
@Bean(destroyMethod = "")
@StepScope
public JdbcCursorItemReader<TableA> TableAReader() throws Exception {
JdbcCursorItemReader<TableA> reader = new JdbcCursorItemReader<TableA>();
reader.setDataSource(this.dataSource);
reader.setSql(sql);
reader.setRowMapper(new TableARowMapper());
reader.afterPropertiesSet();
return reader;
}
@Bean
public ItemProcessor<TableA, TableA> TableAProcessor() {
return new TableAProcessor();
}
@Bean
public TableAWriter TableAWriter() {
return new TableAWriter();
}
}
TableAJob.java
@Configuration
@PropertySource("classpath:application.properties")
public class TableAJob {
@Value( "${spring.chunk.size}")
private String chunkSize;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JdbcCursorItemReader<TableA> TableAReader;
@Autowired
private ItemProcessor<TableA, TableA> TableAProcessor;
@Autowired
private TableAWriter TableAWriter;
@Bean
public TableAStepExecuListner TableAStepExecuListner() {
return new TableAStepExecuListner();
}
@Bean("readTableAJob")
@Primary
public Job readTableAJob() {
return jobBuilderFactory.get("readTableAJob")
.incrementer(new RunIdIncrementer())
.start(TableAStepOne())
.build();
}
@Bean
public Step TableAStepOne() {
return stepBuilderFactory.get("TableAStepOne")
.<TableA, TableA>chunk(Integer.parseInt(chunkSize))
.reader(TableAReader)
.processor(TableAProcessor)
.writer(TableAWriter)
.listener(TableAStepExecuListner())
.build();
}
}
* * Дао тысячи сорок-девять
@Service
public class TableADao {
private static final String SQL = "COMPLEX JOIN QUERY";
@Autowired
private JdbcTemplate jdbcTemplate;
public List<SubDivision> getSubDivision(Integer pkId){
List<Map<String, Object>> results = jdbcTemplate.queryForList(SQL,new Object[] { pkId });
List<SubDivision> divisions = new ArrayList<>();
for (Map<String, Object> row : results) {
divisions.add(SubDivision.builder().subDivisionCd((String)row.get("SUBDIVISION_CD"))
......
.........
.........
......
.build());
}
return divisions;
}
}
TableAWriter.java
public class TableAWriter implements ItemWriter<TableA>{
@Autowired
private TableARepository TableARepository;
@Override
public void write(List<? extends TableA> items) throws Exception {
TableARepository.saveAll(items);
}
}