У меня есть пакетное приложение Spring, которое странно, потому что я не знаю, почему не обрабатывает все элементы, я использую разделение диапазона и составной процессор для преобразования данных. Если мой читатель, например, читает 5787 записей, это пример, потому что их может быть больше, он обрабатывает только 5704 записи, а остальные остаются необработанными. Надеюсь, кто-то может мне помочь, заранее спасибо.
Мой процессор данных
public class data implements ItemProcessor<beangenerico,ThreadLocal<List<beanAccountCollect>>> {
Logger logger = Logger.getLogger(data.class);
private String SP_SQL = "{call GetDetailAccount(?)}";
private String SELECT = "{call myspbyblocks (?,?)}";
private beanAccountCollect b;
private ThreadLocal<List<beanAccountCollect>> listbeanAccC = new ThreadLocal<List<beanAccountCollect>>();
private ThreadLocal<List<beanCustomer>> listbeanc=new ThreadLocal<List<beanCustomer>>();
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public ThreadLocal<List<beanAccountCollect>> process(beangenerico rangos) {
// TODO Auto-generated method stub
listbeanAccC.set(new ArrayList<beanAccountCollect>());
try {
listbeanc = this.jdbcTemplate.query(SELECT,new Object [] {rangos.getIni(),rangos.getFin()},new CustomerResultSetExtractor());
for(beanCustomer bc : listbeanc.get()) {
b = new beanAccountCollect();
b.setUsernetwork(bc.getUsernetwork());
b.setTipoagente(bc.getTipoagente());
b.setLbpar(this.jdbcTemplate.query(SP_SQL,new Object [] {bc.getCuenta()},new BeanAccountResulSetExtractor(this.jdbcTemplate)));
listbeanAccC.get().add(b);
}
}catch (Exception e) {
logger.error(e);
}
return listbeanAccC;
}
public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
}
Это мои данные преобразования
public class transformDataWS implements ItemProcessor<ThreadLocal<List<beanAccountCollect>>, ThreadLocal<List<beanNewMessageBeanP>>>
{
Logger logger = Logger.getLogger(transformDataWS.class);
private ThreadLocal<List<beanNewMessageBeanP>> lstbnmbp = new ThreadLocal<List<beanNewMessageBeanP>>();
private beanNewMessageBeanP bnmbp;
public ThreadLocal<List<beanNewMessageBeanP>> process(ThreadLocal<List<beanAccountCollect>> list) throws Exception {
// TODO Auto-generated method stub
lstbnmbp.set(new ArrayList<beanNewMessageBeanP>());
List<beanParameter> lbeanPar=null;
List<NMessagePEntryBeanParray> lNMPEBa = null;
for(beanAccountCollect bc:list.get()) {
NewMessageParametrosEntryBeanP nb = new NewMessageParametrosEntryBeanP();
NMessagePEntryBeanParray bar= null;
NewMessageParametrosEntryBeanP [] ba = null;
lNMPEBa = new ArrayList<NMessagePEntryBeanParray>();
lbeanPar = new ArrayList<beanParameter>();
lbeanPar = bc.getLbpar();
bnmbp = new beanNewMessageBeanP();
bnmbp.setTipoagente(bc.getTipoagente());
bnmbp.setUsernetwork(bc.getUsernetwork());
if(lbeanPar!=null) {
for(beanParameter bpar : lbeanPar) {
ba = new NewMessageParametrosEntryBeanP[54];
bar = new NMessagePEntryBeanParray();
ba[0] = new NewMessageParametrosEntryBeanP();
ba[0].setKey("PaymentQ");
ba[0].setValue(bpar.getPQ());
ba[1] = new NewMessageParametrosEntryBeanP();
ba[1].setKey("PaymentReest");
ba[1].setValue(bpar.getPR());
ba[2] = new NewMessageParametrosEntryBeanP();
ba[2].setKey("DelayCte");
ba[2].setValue(bpar.getDelayCte());
ba[3] = new NewMessageParametrosEntryBeanP();
ba[3].setKey("DelayRange");ba[3].setValue(bpar.getDelayR());
ba[4] = new NewMessageParametrosEntryBeanP();
ba[4].setKey("C4");ba[4].setValue(bpar.getC3());
ba[5] = new NewMessageParametrosEntryBeanP();
ba[5].setKey("C6");ba[5].setValue(bpar.getC6());
ba[6] = new NewMessageParametrosEntryBeanP();
ba[6].setKey("Banddict");ba[6].setValue(bpar.getBanddict());
ba[7] = new NewMessageParametrosEntryBeanP();
ba[7].setKey("Street");ba[7].setValue(bpar.getStreet());
ba[8] = new NewMessageParametrosEntryBeanP();
ba[8].setKey("Stree_1");ba[8].setValue(bpar.getStreet1());
//....
ba[53] = new NewMessageParametrosEntryBeanP();
ba[53].setKey("Zone");ba[53].setValue(bpar.getZone());
bar.setArr(ba);
lNMPEBa.add(bar);
}
}
bnmbp.setNmespebarr(lNMPEBa);
lstbnmbp.get().add(bnmbp);
}
return lstbnmbp;
}
}
Это мое задание конфигурации
@EnableBatchProcessing
@Configuration
@Import({DBConfiguration.class})
@ComponentScan({"com.mycompany.batch.config","com.mycompany.batch.mapper","com.mycompany.batch.model","com.mycompany.batch.particion","com.mycompan.batch.procesos","com.mycompany.batch.reader","com.mycompany.batch.writers"})
@PropertySource("file:pruebas.properties")
public class ConfigJobBatch {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("sqlserverDataSource")
private DataSource dataSource;
@Autowired
Environment envws;
@Bean(name = "demoPartitionStep")
public Step step1Manager(Step slaveStep) {
return stepBuilderFactory.get("step1.manager")
.<String, String>partitioner("step1", demoPartitioner())
.step(slaveStep)
.gridSize(numerohilos())
.taskExecutor(taskExecutor())
.build();
}
@Bean(name = "demoPartitioner", destroyMethod = "")
public Partitioner demoPartitioner() {
RangePartitioner partitioner = new RangePartitioner();
return partitioner;
}
// slave step
@Bean
public Step slaveStep(ItemReader<beangenerico> demoReader,ItemWriter BeanAccCollectionWriter)
{
return stepBuilderFactory.get("slaveStep")
.chunk(1)
.reader(demoReader)
.processor(compositeProcessor())
.writer(BeanAccCollectionWriter)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public CompositeItemProcessor compositeProcessor() {
List<ItemProcessor> delegates = new ArrayList<>(2);
delegates.add(CustomerProccesor());
delegates.add(beanDataItemProccesor());
CompositeItemProcessor processor = new CompositeItemProcessor();
processor.setDelegates(delegates);
return processor;
}
/***FIXME debemos instanciar los processor como spring bean sino el spring no lee y no toma en cuenta la capa dao o service***/
@Bean
public CustomerItemProcessor CustomerProccesor(){
return new CustomerItemProcessor();
}
@Bean
public beanDataItemProccesor beanDataItemProccesor(){
return new beanDataItemProccesor();
}
/***FIXME debemos instanciar los processor como bean sino no toma en cuenta la capa dao o service***/
@Bean
public CustomItemProcessListener listener() {
return new CustomItemProcessListener();
}
@Bean(name = "demoWriter")
@StepScope
public ItemWriter< beangenerico> CustomItemWriter() {
// TODO Auto-generated method stub
CustomItemWriter wri = new CustomItemWriter();
return wri;
}
@Bean(name = "testWriter")
@StepScope
public ItemWriter<ThreadLocal<CopyOnWriteArrayList<beangen>>> testItemWriter() {
// TODO Auto-generated method stub
TestWriter wri = new TestWriter();
return wri;
}
@Bean(name = "BeanAccCollectionWriter")
@StepScope
public ItemWriter<ThreadLocal<List<NewMessage>>> BeanAccItemWriter() {
// TODO Auto-generated method stub
BeanAccItemWriter wri = new BeanAccItemWriter();
return wri;
}
@Bean(name="flatFileItemWriterPartition")
@StepScope
public FlatFileItemWriter<beangen> slaveWriter(
@Value("#{stepExecutionContext[fromId]}") int fromId,@Value("#{stepExecutionContext[toId]}")int toId ) {
FlatFileItemWriter<beangen> reader = new FlatFileItemWriter<beangen>();
reader.setResource(new FileSystemResource(
"csv/users.processed" + fromId + "-" + toId + ".csv"));
//reader.setAppendAllowed(false);
reader.setLineAggregator(new DelimitedLineAggregator<beangen>() {{
setDelimiter(",");
setFieldExtractor(new BeanWrapperFieldExtractor<beangen>() {{
setNames(new String[]{"usernetwork","cuenta","atributo","atributo2"});
}});
}});
return reader;
}
@Bean(name="tempRecordsWriter")
@StepScope
public ListDelegateWriter ListDelegateWriter(@Qualifier("flatFileItemWriterPartition")FlatFileItemWriter<beangen> writer) {
// TODO Auto-generated method stub
ListDelegateWriter wri = new ListDelegateWriter();
wri.setDelegate(writer);
return wri;
}
@Bean(name = "demoReader")
@StepScope
public ItemReader<beangenerico> myreader(@Value("#{stepExecutionContext['fromId']}") int minValue,@Value("#{stepExecutionContext['toId']}") int maxValue){
Myreader fr = new Myreader(minValue,maxValue);
return fr;
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
@Bean
public Job job(@Qualifier("demoPartitionStep") Step demoPartitionStep) {
return this.jobBuilderFactory.get("job")
.start(demoPartitionStep)
.build();
}
@Bean
public StepExecuListner steplistener() {
return new StepExecuListner();
}
public static int numerohilos() {
/****ciclo para hilos usando rango y numero de hilos a calcular***
**************N_threads = N_cpu * U_cpu * (1 + W / C) *************************************
***N_cpu = Runtime.getRuntime().availableProcessors()**
*******/
int numcpu = Runtime.getRuntime().availableProcessors();
int numthread = numcpu*1*(1+10);
int gridSize=numthread;
return gridSize;
}
}