прежде всего большое спасибо за то, что удосужились прочитать эту консультацию. Я начал использовать Spring Batch для реализации проекта, мой запрос следующий. У меня есть считыватель, который в соответствии со значением переменной заполняет поля запроса и приносит мне результаты, а затем записывает их в файл. Проблема в том, что одной из возможных переменных является «ALL», которая должна запускать программу чтения три раза, изменяя поля запроса для CUSTOMER, ACCOUNT и PRODUCT. Это дало бы мне разные результаты, так как это разные запросы, и их нужно записывать в разные файлы.
Есть ли способ сделать это? Насколько я могу судить, я не могу изменить задание во время его выполнения, поэтому я не могу добавить к нему еще один шаг в соответствии со значением «ВСЕ».
Большое спасибо за ваше время.
Ресурс
public Resource outputResource() {
String outputDir = env.getProperty("uy.com.antel.up.data.folder.out");
Date date = new Date();
DateFormat hourdateFormat = new SimpleDateFormat("dd-MM-yyyy");
String outputName = outputDir + "webUserbatch-" + hourdateFormat.format(date) + ".txt";
File file = new File(outputName);
if(file.exists()) {
try {
file.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
}
Resource resource = new FileSystemResource(file);
return resource;
}
Читатель
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean(destroyMethod = "")
@StepScope
public JdbcCursorItemReader reader(DataSource dataSource, @Value("#{jobParameters}") Map<String, Object> map) {
JdbcCursorItemReader reader = new JdbcCursorItemReader();
reader.setDataSource(dataSource);
reader.setRowMapper(new UsuariosProductosRowMapper());
String urn = "", fechaInicio = "", fechaFin = "", ultimafecha = "", tabla_consultar = "";
if (map.get("urn") != null) {
urn = map.get("urn").toString();
}
if (map.get("fechaInicio") != null) {
fechaInicio = map.get("fechaInicio").toString();
}
if (map.get("fechaFin") != null) {
fechaFin = map.get("fechaFin").toString();
}
if (map.get("ultimafecha") != null) {
ultimafecha = map.get("ultimafecha").toString();
}
if (map.get("tabla_consultar") != null) {
tabla_consultar = map.get("tabla_consultar").toString();
}
String tipoGeneracion = env.getProperty("uy.com.antel.up.tipo.carga");
Date date = new Date();
//Se le asigna el valor de "" para que se pueda realizar busquedas personalizadas, sino se maneja solamente con la carga inicial
//tipoGeneracion = "";
LOG.info("URN: " + urn);
LOG.info("Fecha desde: " + fechaInicio);
LOG.info("Fecha hasta: " + fechaFin);
LOG.info("Ultima fecha: " + ultimafecha);
LOG.info("Tabla consultar: " + tabla_consultar);
//eliminar esto para que tome otro valor por afuera del inicial
//tabla accreditation_event
//datos faltantes: user_name, acrreditation_level
String table_online_id = "";
String table_reference = "";
String shorcut_table_online = "";
String shorcut_table_reference = "";
String attribute_table_online ="";
if (tabla_consultar == null || tabla_consultar.equals(CommandArgument.ALL.name()) ) {
//
//At this point the program should run the query for CUSTOMER, ACCOUNT and PRODUCT
//
}else{
if (tabla_consultar.equals(CommandArgument.CUSTOMER.name())) {
table_online_id = "online_id_customer";
table_reference = "customer_reference";
shorcut_table_online = "oidc";
attribute_table_online = "customer";
shorcut_table_reference = "cr";
}else if (tabla_consultar.equals(CommandArgument.PRODUCT.name())) {
table_online_id = "online_id_product";
table_reference = "product_reference";
shorcut_table_online = "oidp";
attribute_table_online = "product";
shorcut_table_reference = "pr";
}else {
table_online_id = "online_id_account";
table_reference = "account_reference";
shorcut_table_online = "oida";
attribute_table_online = "account";
shorcut_table_reference = "ar";
}
}
if (tipoGeneracion.equalsIgnoreCase("inicial")) {
reader.setSql("select ul.user_name as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n"
+ " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " + shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
+ " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
+ " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
+ " oea.valid=true and " + "ace.date<='" + date + "'");
} else if (urn.isEmpty() && fechaInicio.isEmpty() && fechaFin.isEmpty()) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Calendar cal = Calendar.getInstance();
cal.add(Calendar.MONTH, -1);
cal.set(Calendar.DATE, 1);
Date firstDateOfPreviousMonth = cal.getTime();
String fechaDesde = sdf.format(firstDateOfPreviousMonth);
cal.set(Calendar.DATE, cal.getActualMaximum(Calendar.DATE));
Date lastDateOfPreviousMonth = cal.getTime();
String fechaHasta = sdf.format(lastDateOfPreviousMonth);
reader.setSql("select ul.user_name as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n"
+ " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " + shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
+ " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
+ " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
+ " oea.valid=true and ace.date>='" + fechaDesde + "' and ace.date<='" + fechaHasta + "'");
} else if (!urn.isEmpty() && !fechaInicio.isEmpty() && !fechaFin.isEmpty()) {
reader.setSql("select ul.user_name as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n"
+ " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " + shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
+ " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
+ " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
+ " oea.valid=true and " + " ace.date>='" + map.get("fechaInicio") + "' and \r\n"
+ " ace.date<='" + map.get("fechaFin") + "' and pr.urn like '%" + map.get("urn") + "%'");
} else if (!urn.isEmpty() && fechaInicio.isEmpty() && fechaFin.isEmpty()) {
reader.setSql("select ul.user_name as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n"
+ " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " + shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
+ " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
+ " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
+ " oea.valid=true and " + " pr.urn like '%" + map.get("urn") + "%'");
} else if (urn.isEmpty() && !fechaInicio.isEmpty() && !fechaFin.isEmpty()) {
reader.setSql("select ul.user_name as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n"
+ " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " + shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
+ " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
+ " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
+ " oea.valid=true and " + " ace.date>='" + map.get("fechaInicio") + "' and \r\n"
+ " ace.date<='" + map.get("fechaFin") + "'");
} else if (urn.isEmpty() && !fechaInicio.isEmpty() && fechaFin.isEmpty()) {
reader.setSql("select ul.user_name as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n"
+ " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " + shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
+ " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
+ " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
+ " oea.valid=true and " + " ace.date>='" + map.get("fechaInicio") + "'");
} else if (!urn.isEmpty() && !fechaInicio.isEmpty() && fechaFin.isEmpty()) {
reader.setSql("select ul.user_name as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n"
+ " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " + shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
+ " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
+ " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
+ " oea.valid=true and " + " ace.date>='" + map.get("fechaInicio") + "' and \r\n"
+ " pr.urn like '%" + map.get("urn") + "%'");
} else if (!urn.isEmpty() && fechaInicio.isEmpty() && !fechaFin.isEmpty()) {
reader.setSql("select ul.user_name as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n"
+ " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " + shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
+ " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
+ " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
+ " oea.valid=true and " + " ace.date<='" + map.get("fechaFin") + "' and pr.urn like '%"
+ map.get("urn") + "%'");
} else if (urn.isEmpty() && fechaInicio.isEmpty() && !fechaFin.isEmpty()) {
reader.setSql("select ul.user_name as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n"
+ " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " + shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
+ " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
+ " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
+ " oea.valid=true and " + " ace.date<='" + map.get("fechaFin") + "'");
} else if (urn.isEmpty() && fechaInicio.isEmpty() && fechaFin.isEmpty() && !ultimafecha.isEmpty()) {
reader.setSql("select ul.user_name as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n"
+ " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " + shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
+ " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
+ " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
+ " oea.valid=true and " + " ace.date='" + map.get("ultimafecha") + "'");
}
LOG.info("SQL: " + reader.getSql());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
reader.open(executionContext);
Object customerCredit = new Object();
while (customerCredit != null) {
try {
customerCredit = reader.read();
counter++;
} catch (Exception e) {
e.printStackTrace();
}
}
LOG.info("Registros procesados: " + (counter - 1));
reader.close();
return reader;
}
Writer
@Bean
@StepScope
public ItemWriter<UsuariosProductos> writer() throws Exception {
FlatFileItemWriter<UsuariosProductos> writer = new FlatFileItemWriter<>();
try {
writer.setResource(this.outputResource());
writer.open(new ExecutionContext());
BeanWrapperFieldExtractor<UsuariosProductos> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] { "username", "urn", "fecha", "action", "accreditation_level"});
DelimitedLineAggregator<UsuariosProductos> delLineAgg = new DelimitedLineAggregator<UsuariosProductos>();
delLineAgg.setDelimiter(",");
delLineAgg.setFieldExtractor(fieldExtractor);
writer.setLineAggregator(delLineAgg);
//agregar header
writer.setHeaderCallback(new FlatFileHeaderCallback() {
@Override
public void writeHeader(Writer writer) throws IOException {
writer.write("------USERNAME------,--------URN--------,------------------FECHA------------------,--------ACTION--------,-------------------------ACCREDITATION_LEVEL-----");
writer.write("----------------------------------------");
}
});
//eliminar footer
writer.setFooterCallback(new FlatFileFooterCallback() {
@Override
public void writeFooter(Writer writer) throws IOException {
// writer.write("Archivos procesados: ");
writer.write("----------------------------------------\r\n" +
"----- FIN ARCHIVO DE PROCESADOS OK -----");
}
});
writer.close();
} catch (WriterNotOpenException e) {
LOG.info("Excepcion encontrada: " + e.getMessage());
}
return writer;
}
Job
@Bean
public Job usuariosProductosJob(JobListener listener, Step step1) {
return jobBuilderFactory.get("usuariosProductosJob").incrementer(new RunIdIncrementer()).listener(listener)
.flow(step1).end().build();
}
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<UsuariosProductos> reader,
ItemWriter<UsuariosProductos> writer, ItemProcessor<UsuariosProductos, UsuariosProductos> processor) {
return stepBuilderFactory.get("step1").<UsuariosProductos, UsuariosProductos>chunk(obtenerChunk())
.reader(reader).processor(processor).writer(writer).build();
}
public Integer obtenerChunk() {
Integer chunk = Integer.valueOf(env.getProperty("uy.com.antel.up.chunk.step"));
return chunk;
}
UsuariosProductRowMapper.class
public class UsuariosProductosRowMapper implements RowMapper<UsuariosProductos>{
private static final Logger LOG = LoggerFactory.getLogger(UsuariosProductosRowMapper.class);
private static final String USERNAME = "userName";
private static final String URN = "urn";
private static final String FECHA = "fecha";
private static final String ACTION = "action";
private static final String ACCREDITATION_LEVEL = "accreditation_level";
@Override
public UsuariosProductos mapRow(ResultSet rs, int rowNum) throws SQLException {
UsuariosProductos up = new UsuariosProductos();
up.setUsername(rs.getString(USERNAME));
up.setUrn(rs.getString(URN));
up.setAction(rs.getString(ACTION));
up.setAccreditation_level(rs.getString(ACCREDITATION_LEVEL));
LOG.info("Estoy en UsuariosProductosRow");
LOG.info("resultSet: " + rs.toString());
LOG.info("action: " + rs.getString(ACTION));
LOG.info("accreditation_level: " + rs.getString(ACCREDITATION_LEVEL));
try {
java.sql.Timestamp fecha = rs.getTimestamp(FECHA);
LOG.info("Fecha java sql: " + fecha);
up.setFecha(fecha.toString());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(rs.isLast()) {
up.setProcesados(rs.getRow());
}
return up;
}
}
Proccesor
public class UsuariosProductosProcessor implements ItemProcessor<UsuariosProductos, UsuariosProductos>{
private static final Logger LOG = LoggerFactory.getLogger(UsuariosProductosProcessor.class);
@Override
public UsuariosProductos process(UsuariosProductos item) throws Exception {
UsuariosProductos up = new UsuariosProductos();
if(item.getUsername() != null)
up.setUsername("userName: " + item.getUsername() + " ");
else
up.setUsername("userName: Not found ");
// no igual, nada. null
up.setUrn("urn: " + item.getUrn() + " ");
up.setFecha("fechaAcreditación: " + item.getFecha() + " ");
up.setAction("action: " + item.getAction() + " ");
up.setAccreditation_level("accreditation_level: " + item.getAccreditation_level() + " ");
return up;
}
}