Пакетное приложение Spring обрабатывает не все элементы - PullRequest
2 голосов
/ 19 июня 2020

У меня есть пакетное приложение Spring, которое странно, потому что я не знаю, почему не обрабатывает все элементы, я использую разделение диапазона и составной процессор для преобразования данных. Если мой читатель, например, читает 5787 записей, это пример, потому что их может быть больше, он обрабатывает только 5704 записи, а остальные остаются необработанными. Надеюсь, кто-то может мне помочь, заранее спасибо.

Мой процессор данных

public class data implements ItemProcessor<beangenerico,ThreadLocal<List<beanAccountCollect>>> {

    Logger logger = Logger.getLogger(data.class);

    private String SP_SQL = "{call GetDetailAccount(?)}";
    private String SELECT = "{call myspbyblocks (?,?)}";
    private beanAccountCollect b;
    private ThreadLocal<List<beanAccountCollect>> listbeanAccC = new ThreadLocal<List<beanAccountCollect>>();
    private ThreadLocal<List<beanCustomer>> listbeanc=new ThreadLocal<List<beanCustomer>>();

    @Autowired
    private  JdbcTemplate jdbcTemplate;

    @Override
    public ThreadLocal<List<beanAccountCollect>> process(beangenerico rangos)  {
        // TODO Auto-generated method stub

        listbeanAccC.set(new ArrayList<beanAccountCollect>());

        try {
        listbeanc = this.jdbcTemplate.query(SELECT,new Object [] {rangos.getIni(),rangos.getFin()},new CustomerResultSetExtractor());

            for(beanCustomer bc : listbeanc.get())  {

                b = new beanAccountCollect();
                b.setUsernetwork(bc.getUsernetwork());
                b.setTipoagente(bc.getTipoagente());
                b.setLbpar(this.jdbcTemplate.query(SP_SQL,new Object [] {bc.getCuenta()},new BeanAccountResulSetExtractor(this.jdbcTemplate)));

                listbeanAccC.get().add(b);
            }

        }catch (Exception e) {
            logger.error(e);
        }

        return listbeanAccC;
    }

    public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }
}

Это мои данные преобразования

public class transformDataWS implements ItemProcessor<ThreadLocal<List<beanAccountCollect>>, ThreadLocal<List<beanNewMessageBeanP>>>
{

    Logger logger = Logger.getLogger(transformDataWS.class);

    private ThreadLocal<List<beanNewMessageBeanP>> lstbnmbp = new ThreadLocal<List<beanNewMessageBeanP>>();

    private beanNewMessageBeanP bnmbp;

    public ThreadLocal<List<beanNewMessageBeanP>> process(ThreadLocal<List<beanAccountCollect>> list) throws Exception {    

            // TODO Auto-generated method stub
        lstbnmbp.set(new ArrayList<beanNewMessageBeanP>());
        List<beanParameter> lbeanPar=null;
        List<NMessagePEntryBeanParray> lNMPEBa = null;

        for(beanAccountCollect bc:list.get()) {

            NewMessageParametrosEntryBeanP nb = new NewMessageParametrosEntryBeanP();
            NMessagePEntryBeanParray bar= null;
            NewMessageParametrosEntryBeanP [] ba = null;
            lNMPEBa = new ArrayList<NMessagePEntryBeanParray>();
            lbeanPar = new ArrayList<beanParameter>();
            lbeanPar = bc.getLbpar();
            bnmbp = new beanNewMessageBeanP();

            bnmbp.setTipoagente(bc.getTipoagente());
            bnmbp.setUsernetwork(bc.getUsernetwork());


            if(lbeanPar!=null) {

                for(beanParameter bpar : lbeanPar) {

                    ba = new NewMessageParametrosEntryBeanP[54];
                    bar = new NMessagePEntryBeanParray();


                    ba[0] = new NewMessageParametrosEntryBeanP();
                    ba[0].setKey("PaymentQ");
                    ba[0].setValue(bpar.getPQ());

                    ba[1] = new NewMessageParametrosEntryBeanP();
                    ba[1].setKey("PaymentReest");
                    ba[1].setValue(bpar.getPR());

                    ba[2] = new NewMessageParametrosEntryBeanP();
                    ba[2].setKey("DelayCte");
                    ba[2].setValue(bpar.getDelayCte());

                    ba[3] = new NewMessageParametrosEntryBeanP();
                    ba[3].setKey("DelayRange");ba[3].setValue(bpar.getDelayR());

                    ba[4] = new NewMessageParametrosEntryBeanP();
                    ba[4].setKey("C4");ba[4].setValue(bpar.getC3());

                    ba[5] = new NewMessageParametrosEntryBeanP();
                    ba[5].setKey("C6");ba[5].setValue(bpar.getC6());

                    ba[6] = new NewMessageParametrosEntryBeanP();
                    ba[6].setKey("Banddict");ba[6].setValue(bpar.getBanddict());

                    ba[7] = new NewMessageParametrosEntryBeanP();
                    ba[7].setKey("Street");ba[7].setValue(bpar.getStreet());

                    ba[8] = new NewMessageParametrosEntryBeanP();
                    ba[8].setKey("Stree_1");ba[8].setValue(bpar.getStreet1());

                    //....

                    ba[53] = new NewMessageParametrosEntryBeanP();
                    ba[53].setKey("Zone");ba[53].setValue(bpar.getZone());

                    bar.setArr(ba);
                    lNMPEBa.add(bar);
                }
            }
            bnmbp.setNmespebarr(lNMPEBa);

            lstbnmbp.get().add(bnmbp);
        }
         return lstbnmbp;
    }
}

Это мое задание конфигурации

@EnableBatchProcessing
@Configuration
@Import({DBConfiguration.class})
@ComponentScan({"com.mycompany.batch.config","com.mycompany.batch.mapper","com.mycompany.batch.model","com.mycompany.batch.particion","com.mycompan.batch.procesos","com.mycompany.batch.reader","com.mycompany.batch.writers"})
@PropertySource("file:pruebas.properties")
public class ConfigJobBatch {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    @Qualifier("sqlserverDataSource")
    private DataSource dataSource;

    @Autowired
    Environment envws;


    @Bean(name = "demoPartitionStep")
    public Step step1Manager(Step slaveStep) {
        return stepBuilderFactory.get("step1.manager")
            .<String, String>partitioner("step1", demoPartitioner())
            .step(slaveStep)
            .gridSize(numerohilos())
            .taskExecutor(taskExecutor())
            .build();
    }

    @Bean(name = "demoPartitioner", destroyMethod = "")
    public Partitioner demoPartitioner() {
        RangePartitioner partitioner = new RangePartitioner();
        return partitioner;
    }

    // slave step
    @Bean 
    public Step slaveStep(ItemReader<beangenerico> demoReader,ItemWriter BeanAccCollectionWriter)
    {
        return stepBuilderFactory.get("slaveStep")
                .chunk(1)
                .reader(demoReader)
                .processor(compositeProcessor())
                .writer(BeanAccCollectionWriter)
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public CompositeItemProcessor compositeProcessor() {

        List<ItemProcessor> delegates = new ArrayList<>(2);
        delegates.add(CustomerProccesor());
        delegates.add(beanDataItemProccesor());
        CompositeItemProcessor processor = new CompositeItemProcessor();

        processor.setDelegates(delegates);
        return processor;
    }

    /***FIXME debemos instanciar los processor como spring bean sino el spring no lee y no toma en cuenta la capa dao o service***/
    @Bean
    public CustomerItemProcessor CustomerProccesor(){
        return new CustomerItemProcessor();
    }

    @Bean
    public beanDataItemProccesor beanDataItemProccesor(){
        return new beanDataItemProccesor();
    }

    /***FIXME debemos instanciar los processor como bean sino no toma en cuenta la capa dao o service***/
    @Bean
    public CustomItemProcessListener listener() {
        return new CustomItemProcessListener();
    }

    @Bean(name = "demoWriter")
    @StepScope
    public ItemWriter< beangenerico> CustomItemWriter() {
        // TODO Auto-generated method stub
        CustomItemWriter wri = new CustomItemWriter();
        return wri;
    }

    @Bean(name = "testWriter")
    @StepScope
    public ItemWriter<ThreadLocal<CopyOnWriteArrayList<beangen>>> testItemWriter() {
        // TODO Auto-generated method stub
        TestWriter wri = new TestWriter();
        return wri;
    }

    @Bean(name = "BeanAccCollectionWriter")
    @StepScope
    public ItemWriter<ThreadLocal<List<NewMessage>>> BeanAccItemWriter() {  
        // TODO Auto-generated method stub
        BeanAccItemWriter wri = new BeanAccItemWriter();
        return wri;
    }

    @Bean(name="flatFileItemWriterPartition")
    @StepScope
    public FlatFileItemWriter<beangen> slaveWriter(
            @Value("#{stepExecutionContext[fromId]}") int fromId,@Value("#{stepExecutionContext[toId]}")int toId ) {
      FlatFileItemWriter<beangen> reader = new FlatFileItemWriter<beangen>();
      reader.setResource(new FileSystemResource(
          "csv/users.processed" + fromId + "-" + toId + ".csv"));
      //reader.setAppendAllowed(false);
      reader.setLineAggregator(new DelimitedLineAggregator<beangen>() {{
        setDelimiter(",");
        setFieldExtractor(new BeanWrapperFieldExtractor<beangen>() {{
          setNames(new String[]{"usernetwork","cuenta","atributo","atributo2"});
        }});
      }});
      return reader;
    }

    @Bean(name="tempRecordsWriter")
    @StepScope
    public ListDelegateWriter ListDelegateWriter(@Qualifier("flatFileItemWriterPartition")FlatFileItemWriter<beangen> writer) {
        // TODO Auto-generated method stub
        ListDelegateWriter wri = new ListDelegateWriter();
        wri.setDelegate(writer);

        return wri;
    }

    @Bean(name = "demoReader")
    @StepScope
    public ItemReader<beangenerico> myreader(@Value("#{stepExecutionContext['fromId']}") int minValue,@Value("#{stepExecutionContext['toId']}") int maxValue){
        Myreader fr = new Myreader(minValue,maxValue);
        return fr;
    }

    @Bean
    public TaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor("spring_batch");
    }

    @Bean
    public Job job(@Qualifier("demoPartitionStep") Step demoPartitionStep) {
        return this.jobBuilderFactory.get("job")
                .start(demoPartitionStep)
                .build();
    }

    @Bean
    public StepExecuListner steplistener() {
        return new StepExecuListner();
    }

    public static int numerohilos() {
        /****ciclo para hilos usando rango y numero de hilos a calcular***
          **************N_threads = N_cpu * U_cpu * (1 + W / C) *************************************
          ***N_cpu = Runtime.getRuntime().availableProcessors()**
          *******/
        int numcpu = Runtime.getRuntime().availableProcessors();
        int numthread = numcpu*1*(1+10);
        int gridSize=numthread;
        return gridSize;
    }
}

1 Ответ

0 голосов
/ 23 июня 2020

Исправляю свой код, это помогло.

public class data implements ItemProcessor<beangenerico,ThreadLocal<List<beanAccountCollect>>> {

    Logger logger = Logger.getLogger(data.class);
    
    private String SP_SQL = "{call GetDetailAccount(?)}";
    private String SELECT = "{call myspbyblocks (?,?)}";
    private beanAccountCollect b;
    private ThreadLocal<List<beanAccountCollect>> listbeanAccC = new ThreadLocal<List<beanAccountCollect>>();
    private ThreadLocal<List<beanCustomer>> listbeanc=new ThreadLocal<List<beanCustomer>>();
    
    @Autowired
    private  JdbcTemplate jdbcTemplate;
    
    @Override
    public ThreadLocal<List<beanAccountCollect>> process(beangenerico rangos)  {
        // TODO Auto-generated method stub

        List<beanParameter> lbeanPar=null;
        listbeanAccC.set(new ArrayList<beanAccountCollect>());
        
        try {
        listbeanc = this.jdbcTemplate.query(SELECT,new Object [] {rangos.getIni(),rangos.getFin()},new CustomerResultSetExtractor());
        
            for(beanCustomer bc : listbeanc.get())  {

                b = new beanAccountCollect();
                lbeanPar = new ArrayList<beanParameter>();
                
                lbeanPar = this.jdbcTemplate.query(SP_SQL,new Object [] {bc.getCuenta()},new BeanAccountResulSetExtractor(this.jdbcTemplate));
                
                b.setUsernetwork(bc.getUsernetwork());
                b.setTipoagente(bc.getTipoagente());
                b.setLbpar(lbeanPar);
                                                        
                listbeanAccC.get().add(b);
            }

        }catch (Exception e) {
            logger.error(e);
        }

        return listbeanAccC;
    }

    public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }
}
...