Spring Batch l oop считыватель при изменении sql запроса и записи в разные файлы - PullRequest
0 голосов
/ 06 мая 2020

прежде всего большое спасибо за то, что удосужились прочитать эту консультацию. Я начал использовать Spring Batch для реализации проекта, мой запрос следующий. У меня есть считыватель, который в соответствии со значением переменной заполняет поля запроса и приносит мне результаты, а затем записывает их в файл. Проблема в том, что одной из возможных переменных является «ALL», которая должна запускать программу чтения три раза, изменяя поля запроса для CUSTOMER, ACCOUNT и PRODUCT. Это дало бы мне разные результаты, так как это разные запросы, и их нужно записывать в разные файлы.

Есть ли способ сделать это? Насколько я могу судить, я не могу изменить задание во время его выполнения, поэтому я не могу добавить к нему еще один шаг в соответствии со значением «ВСЕ».

Большое спасибо за ваше время.

Ресурс

public Resource outputResource() {
        String outputDir = env.getProperty("uy.com.antel.up.data.folder.out");
        Date date = new Date();
        DateFormat hourdateFormat = new SimpleDateFormat("dd-MM-yyyy");
        String outputName = outputDir + "webUserbatch-" + hourdateFormat.format(date) + ".txt";
        File file = new File(outputName);
        if(file.exists()) {
            try {
                file.createNewFile();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        Resource resource = new FileSystemResource(file);

        return resource;
    }

Читатель

@SuppressWarnings({ "rawtypes", "unchecked" })
    @Bean(destroyMethod = "")
    @StepScope
    public JdbcCursorItemReader reader(DataSource dataSource, @Value("#{jobParameters}") Map<String, Object> map) {
        JdbcCursorItemReader reader = new JdbcCursorItemReader();
        reader.setDataSource(dataSource);
        reader.setRowMapper(new UsuariosProductosRowMapper());
        String urn = "", fechaInicio = "", fechaFin = "", ultimafecha = "", tabla_consultar = "";
        if (map.get("urn") != null) {
            urn = map.get("urn").toString();
        }
        if (map.get("fechaInicio") != null) {
            fechaInicio = map.get("fechaInicio").toString();
        }
        if (map.get("fechaFin") != null) {
            fechaFin = map.get("fechaFin").toString();
        }
        if (map.get("ultimafecha") != null) {
            ultimafecha = map.get("ultimafecha").toString();
        }
        if (map.get("tabla_consultar") != null) {
            tabla_consultar = map.get("tabla_consultar").toString();
        }
        String tipoGeneracion = env.getProperty("uy.com.antel.up.tipo.carga");

        Date date = new Date();

        //Se le asigna el valor de "" para que se pueda realizar busquedas personalizadas, sino se maneja solamente con la carga inicial 
        //tipoGeneracion = "";

        LOG.info("URN: " + urn);
        LOG.info("Fecha desde: " + fechaInicio);
        LOG.info("Fecha hasta: " + fechaFin);
        LOG.info("Ultima fecha: " + ultimafecha);
        LOG.info("Tabla consultar: " + tabla_consultar);


        //eliminar esto para que tome otro valor por afuera del inicial
        //tabla accreditation_event
        //datos faltantes: user_name, acrreditation_level

        String table_online_id = "";
        String table_reference = "";
        String shorcut_table_online = "";
        String shorcut_table_reference = "";
        String attribute_table_online ="";

        if (tabla_consultar == null || tabla_consultar.equals(CommandArgument.ALL.name()) ) {
            //
            //At this point the program should run the query for CUSTOMER, ACCOUNT and PRODUCT
            //
        }else{
            if (tabla_consultar.equals(CommandArgument.CUSTOMER.name())) {
                table_online_id = "online_id_customer";
                table_reference = "customer_reference";
                shorcut_table_online = "oidc";
                attribute_table_online = "customer";
                shorcut_table_reference = "cr";
            }else if (tabla_consultar.equals(CommandArgument.PRODUCT.name())) {
                table_online_id = "online_id_product";
                table_reference = "product_reference";
                shorcut_table_online = "oidp";
                attribute_table_online = "product";
                shorcut_table_reference = "pr";
            }else {
                table_online_id = "online_id_account";
                table_reference = "account_reference";
                shorcut_table_online = "oida";
                attribute_table_online = "account";
                shorcut_table_reference = "ar";
            }
          }





        if (tipoGeneracion.equalsIgnoreCase("inicial")) {
            reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " oea.valid=true and " + "ace.date<='" + date + "'");
        } else if (urn.isEmpty() && fechaInicio.isEmpty() && fechaFin.isEmpty()) {

            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            Calendar cal = Calendar.getInstance();
            cal.add(Calendar.MONTH, -1);
            cal.set(Calendar.DATE, 1);

            Date firstDateOfPreviousMonth = cal.getTime();
            String fechaDesde = sdf.format(firstDateOfPreviousMonth);

            cal.set(Calendar.DATE, cal.getActualMaximum(Calendar.DATE)); 

            Date lastDateOfPreviousMonth = cal.getTime();

            String fechaHasta = sdf.format(lastDateOfPreviousMonth);

            reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " oea.valid=true and ace.date>='" + fechaDesde + "' and ace.date<='" + fechaHasta + "'");
        } else if (!urn.isEmpty() && !fechaInicio.isEmpty() && !fechaFin.isEmpty()) {
            reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " oea.valid=true and " + "      ace.date>='" + map.get("fechaInicio") + "' and \r\n"
                    + "   ace.date<='" + map.get("fechaFin") + "' and  pr.urn like '%" + map.get("urn") + "%'");
        } else if (!urn.isEmpty() && fechaInicio.isEmpty() && fechaFin.isEmpty()) {
            reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " oea.valid=true and " + " pr.urn like '%" + map.get("urn") + "%'");
        } else if (urn.isEmpty() && !fechaInicio.isEmpty() && !fechaFin.isEmpty()) {
            reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " oea.valid=true and " + "      ace.date>='" + map.get("fechaInicio") + "' and \r\n"
                    + "   ace.date<='" + map.get("fechaFin") + "'");
        } else if (urn.isEmpty() && !fechaInicio.isEmpty() && fechaFin.isEmpty()) {
            reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " oea.valid=true and " + "      ace.date>='" + map.get("fechaInicio") + "'");
        } else if (!urn.isEmpty() && !fechaInicio.isEmpty() && fechaFin.isEmpty()) {
            reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " oea.valid=true and " + "      ace.date>='" + map.get("fechaInicio") + "' and \r\n"
                    + " pr.urn like '%" + map.get("urn") + "%'");
        } else if (!urn.isEmpty() && fechaInicio.isEmpty() && !fechaFin.isEmpty()) {
            reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " oea.valid=true and " + "      ace.date<='" + map.get("fechaFin") + "' and  pr.urn like '%"
                    + map.get("urn") + "%'");
        } else if (urn.isEmpty() && fechaInicio.isEmpty() && !fechaFin.isEmpty()) {
            reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " oea.valid=true and " + "      ace.date<='" + map.get("fechaFin") + "'");
        } else if (urn.isEmpty() && fechaInicio.isEmpty() && fechaFin.isEmpty() && !ultimafecha.isEmpty()) {
            reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " oea.valid=true and " + "      ace.date='" + map.get("ultimafecha") + "'");
        }

        LOG.info("SQL: " + reader.getSql());

        int counter = 0;
        ExecutionContext executionContext = new ExecutionContext();
        reader.open(executionContext);
        Object customerCredit = new Object();
        while (customerCredit != null) {
            try {
                customerCredit = reader.read();
                counter++;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        LOG.info("Registros procesados: " + (counter - 1));

        reader.close();

        return reader;
    }

Writer

@Bean
    @StepScope
    public ItemWriter<UsuariosProductos> writer() throws Exception {
        FlatFileItemWriter<UsuariosProductos> writer = new FlatFileItemWriter<>();
        try {
            writer.setResource(this.outputResource());
            writer.open(new ExecutionContext());

            BeanWrapperFieldExtractor<UsuariosProductos> fieldExtractor = new BeanWrapperFieldExtractor<>();
            fieldExtractor.setNames(new String[] { "username", "urn", "fecha", "action", "accreditation_level"});

            DelimitedLineAggregator<UsuariosProductos> delLineAgg = new DelimitedLineAggregator<UsuariosProductos>();
            delLineAgg.setDelimiter(",");
            delLineAgg.setFieldExtractor(fieldExtractor);
            writer.setLineAggregator(delLineAgg);

            //agregar header
            writer.setHeaderCallback(new FlatFileHeaderCallback() {
                @Override
                public void writeHeader(Writer writer) throws IOException {
                    writer.write("------USERNAME------,--------URN--------,------------------FECHA------------------,--------ACTION--------,-------------------------ACCREDITATION_LEVEL-----");
                    writer.write("----------------------------------------");
                }
            });

            //eliminar footer
            writer.setFooterCallback(new FlatFileFooterCallback() {
                @Override
                public void writeFooter(Writer writer) throws IOException {
//                  writer.write("Archivos procesados: ");
                    writer.write("----------------------------------------\r\n" + 
                            "----- FIN ARCHIVO DE PROCESADOS OK -----");
                }
            });

            writer.close();
        } catch (WriterNotOpenException e) {
            LOG.info("Excepcion encontrada: " + e.getMessage());
        }
        return writer;

    }

Job

@Bean
    public Job usuariosProductosJob(JobListener listener, Step step1) {
        return jobBuilderFactory.get("usuariosProductosJob").incrementer(new RunIdIncrementer()).listener(listener)
                .flow(step1).end().build();
    }

    @Bean
    public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<UsuariosProductos> reader,
            ItemWriter<UsuariosProductos> writer, ItemProcessor<UsuariosProductos, UsuariosProductos> processor) {

        return stepBuilderFactory.get("step1").<UsuariosProductos, UsuariosProductos>chunk(obtenerChunk())
                .reader(reader).processor(processor).writer(writer).build();
    }


    public Integer obtenerChunk() {
        Integer chunk = Integer.valueOf(env.getProperty("uy.com.antel.up.chunk.step"));

        return chunk;
    }

UsuariosProductRowMapper.class

public class UsuariosProductosRowMapper implements RowMapper<UsuariosProductos>{

    private static final Logger LOG = LoggerFactory.getLogger(UsuariosProductosRowMapper.class);

    private static final String USERNAME = "userName";
    private static final String URN = "urn";
    private static final String FECHA = "fecha";
    private static final String ACTION = "action";
    private static final String ACCREDITATION_LEVEL = "accreditation_level";


    @Override
    public UsuariosProductos mapRow(ResultSet rs, int rowNum) throws SQLException {
        UsuariosProductos up = new UsuariosProductos();
        up.setUsername(rs.getString(USERNAME));
        up.setUrn(rs.getString(URN));
        up.setAction(rs.getString(ACTION));
        up.setAccreditation_level(rs.getString(ACCREDITATION_LEVEL));

        LOG.info("Estoy en UsuariosProductosRow");

        LOG.info("resultSet: " + rs.toString());
        LOG.info("action: " + rs.getString(ACTION));
        LOG.info("accreditation_level: " + rs.getString(ACCREDITATION_LEVEL));

        try {
            java.sql.Timestamp fecha = rs.getTimestamp(FECHA);
            LOG.info("Fecha java sql: " + fecha);
            up.setFecha(fecha.toString());
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        if(rs.isLast()) {
            up.setProcesados(rs.getRow());
        }
        return up;
    }

}

Proccesor

public class UsuariosProductosProcessor implements ItemProcessor<UsuariosProductos, UsuariosProductos>{

    private static final Logger LOG = LoggerFactory.getLogger(UsuariosProductosProcessor.class);

    @Override
    public UsuariosProductos process(UsuariosProductos item) throws Exception {
        UsuariosProductos up = new UsuariosProductos();
        if(item.getUsername() != null)
            up.setUsername("userName: " + item.getUsername() + " ");
        else
            up.setUsername("userName: Not found ");
        // no igual, nada. null

        up.setUrn("urn: " + item.getUrn() + " ");
        up.setFecha("fechaAcreditación: " + item.getFecha() + " ");
        up.setAction("action: " + item.getAction() + " ");
        up.setAccreditation_level("accreditation_level: " + item.getAccreditation_level() + " ");

        return up;
    }

}

1 Ответ

0 голосов
/ 11 мая 2020

Основываясь на комментариях Махмуда Бен Хассина, я решил использовать несколько заданий, которые выполняют разные задачи, но без нарушения структуры, которая уже существовала в коде, поскольку программное обеспечение, над которым я работаю, принадлежит клиенту, а я не Не знаю, сколько я могу изменить из него.

Я сделал следующее: определил переменную, которая принимает аргумент, введенный в банку, и пытается выполнить процедуру задания, связанного с ней . В случае ALL он будет выполнять каждое задание одно за другим.

if (map.get("tabla_consultar") != null) {
            type_job = map.get("tabla_consultar").toString();
            if (type_job.equalsIgnoreCase(CommandArgument.ALL.name())) {
                map.put("tabla_consultar", new JobParameter(CommandArgument.ACCOUNT.name())); 
                executeJob(ctx.getBean("usuariosAccountJob", Job.class), jobLauncher, map);
                map.put("tabla_consultar", new JobParameter(CommandArgument.CUSTOMER.name()));
                executeJob(ctx.getBean("usuariosCustomerJob", Job.class), jobLauncher, map);
                map.put("tabla_consultar", new JobParameter(CommandArgument.PRODUCT.name()));
                executeJob(ctx.getBean("usuariosProductosJob", Job.class), jobLauncher, map);
            }else if (type_job.equalsIgnoreCase(CommandArgument.CUSTOMER.name())) {
                executeJob(ctx.getBean("usuariosCustomerJob", Job.class), jobLauncher, map);
            }else if (type_job.equalsIgnoreCase(CommandArgument.ACCOUNT.name())) {
                executeJob(ctx.getBean("usuariosAccountJob", Job.class), jobLauncher, map);
            }else {
                executeJob(ctx.getBean("usuariosProductosJob", Job.class), jobLauncher, map);
            }
        }

В BatchConfiguration у меня есть переменная, которая проверяет, какой тип работы вы хотите выполнить, и исходя из этого заполняет поля запроса, относящиеся к таблице и атрибутам, связанным с этим заданием.

    String table_online_id = "";
    String table_reference = "";
    String shorcut_table_online = "";
    String shorcut_table_reference = "";
    String attribute_table_online ="";

    if (tabla_consultar.equalsIgnoreCase(CommandArgument.CUSTOMER.name())) {
        table_online_id = "online_id_customer";
        table_reference = "customer_reference";
        shorcut_table_online = "oidc";
        attribute_table_online = "customer";
        shorcut_table_reference = "cr";
    }else if (tabla_consultar.equalsIgnoreCase(CommandArgument.PRODUCT.name())) {
        table_online_id = "online_id_product";
        table_reference = "product_reference";
        shorcut_table_online = "oidp";
        attribute_table_online = "product";
        shorcut_table_reference = "pr";
    }else {
        table_online_id = "online_id_account";
        table_reference = "account_reference";
        shorcut_table_online = "oida";
        attribute_table_online = "account";
        shorcut_table_reference = "ar";
    }

Например:

reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.code as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.id=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " ace.date<'" + date + "'");

To fini sh используется переменная tabla_consultar как часть имени файла, таким образом я могу быть уверен, что если запрос, который я хочу сделать, будет от CUSTOMER, файл будет xxxx-Customer-xxxx, меняющимся, если я прошу от ACCOUNT или PRODUCT, таким образом, достигнув того, что файл без перезаписи, этого также можно было добиться, добавив значение чч: мм: сс.

DateFormat hourdateFormat = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");
        String outputName = outputDir + "wxxxxxxx-"+ tabla_consultar +"-"+ hourdateFormat.format(date) + ".txt";
        File file = new File(outputName);
...