У меня есть приложение Spring Batch, которое читает файл osm-billers.csv. Когда я запускаю приложение, оно обрабатывает записи, доступные в CSV-файле. Затем я изменил содержимое файла и сохранил его. Но это все еще читает старое содержание. Ранее он читал файл без проблем, а теперь выдает проблему, как будто есть проблема с кэшированием. Мой CSV-файл содержит только 3 или 4 записи.
BillerOrderId 1001289463281044 1001289073251049 1000819614021112 000000002
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private CsvFileToDatabaseJobConfig csvFileToDatabaseJobConfig;
@Autowired
private DatabaseToCsvFileJobConfig databaseToCsvFileJobConfig;
@Bean
public FlatFileItemReader<Biller> reader(){
try {
FlatFileItemReader<Biller> itemReader = csvFileToDatabaseJobConfig.csvFileItemReader();
return itemReader ;
} catch (UnexpectedInputException e) {
throw new OrderBatchException("Invalid Input..." + e.getMessage());
} catch (ParseException e) {
throw new OrderBatchException("Parsing error..." + e.getMessage());
} catch (NonTransientResourceException e) {
throw new OrderBatchException("NonTransientReasource error..." + e.getMessage());
} catch (Exception e) {
throw new OrderBatchException("Unknown Read error..." + e.getMessage());
}
}
@Bean
public OrderProcessor processor() {
return new OrderProcessor();
}
@Bean
public ItemWriter<Biller> writer() {
try {
ItemWriter<Biller> itemWriter = databaseToCsvFileJobConfig.databaseCsvItemWriter();
return itemWriter;
} catch (Exception e) {
throw new OrderBatchException("Unknown Write error..." + e.getMessage());
}
}
@Bean
public Job importJobOrder(JobCompletionNotificationListner listener, Step step1) {
return jobBuilderFactory.get("importJobOrder")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean
public Step step1(ItemWriter<Biller> writer) {
return stepBuilderFactory.get("step1")
.<Biller, Biller> chunk(10)
.reader((ItemReader<? extends Biller>) reader())
.processor(processor())
.writer(writer)
.build();
}
}
public class CsvFileToDatabaseJobConfig {
@Bean
FlatFileItemReader<Biller> csvFileItemReader() {
FlatFileItemReader<Biller> csvFileReader = new FlatFileItemReader<>();
csvFileReader.setResource(new ClassPathResource("osm-billers.csv"));
csvFileReader.setLinesToSkip(1);
LineMapper<Biller> billerLineMapper = createBillerLineMapper();
csvFileReader.setLineMapper(billerLineMapper);
return csvFileReader;
}
private LineMapper<Biller> createBillerLineMapper() {
DefaultLineMapper<Biller> billerLineMapper = new DefaultLineMapper<>();
LineTokenizer billerLineTokenizer = createBillerLineTokenizer();
billerLineMapper.setLineTokenizer(billerLineTokenizer);
FieldSetMapper<Biller> billerInformationMapper = createBillerInformationMapper();
billerLineMapper.setFieldSetMapper(billerInformationMapper);
return billerLineMapper;
}
private FieldSetMapper<Biller> createBillerInformationMapper() {
BeanWrapperFieldSetMapper<Biller> billerInformationMapper = new BeanWrapperFieldSetMapper<>();
billerInformationMapper.setTargetType(Biller.class);
return billerInformationMapper;
}
private LineTokenizer createBillerLineTokenizer() {
DelimitedLineTokenizer billerLineTokenizer = new DelimitedLineTokenizer();
billerLineTokenizer.setNames(new String[] {"billerOrderId"});
return billerLineTokenizer;
}
}
public class OrderReader implements ItemReader<OrderResponse>{
private static final Logger log = LoggerFactory.getLogger(OrderReader.class);
private final String apiUrl;
private final RestTemplate restTemplate;
private OrderResponse orderResponse;
@Autowired
private OrderRequest orderRequest;
private String userName;
private String password;
public OrderReader(String apiUrl, String userName, String password, RestTemplate restTemplate, OrderRequest orderRequest) {
this.apiUrl = apiUrl;
this.restTemplate = restTemplate;
this.orderRequest = orderRequest;
this.userName = userName;
this.password = password;
}
private boolean orderisNotInitialized() {
return this.orderResponse == null;
}
private OrderResponse fetchOrderDataFromApi(OrderRequest orderRequest) {
log.debug("OrderRequest = " + orderRequest.getOrder().getBillerOrderId());
log.debug("apiUrl = " + apiUrl);
log.debug("userName = " + userName);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
headers.setBasicAuth(userName, password);
HttpEntity<OrderRequest> requestEntity =
new HttpEntity<OrderRequest>(orderRequest, headers);
ResponseEntity<OrderResponse> response =
restTemplate.exchange(apiUrl,HttpMethod.POST, requestEntity,OrderResponse.class);
log.debug("response = " + response);
OrderResponse orderResponse = response.getBody();
return orderResponse;
}
@Override
public OrderResponse read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (orderisNotInitialized()) {
orderResponse = fetchOrderDataFromApi(orderRequest);
}
return orderResponse;
}
}
public class OrderProcessor implements ItemProcessor<Biller, Biller>{
@Value("${osm.service.url}")
private String orderUrl;
@Value("${osm.service.username}")
private String userName;
@Value("${osm.service.password}")
private String password;
@Autowired
RestTemplate restTemplate;
@Override
public Biller process(Biller biller) throws Exception {
OrderRequest orderRequest = new OrderRequest();
Order order = new Order();
order.setBillerOrderId(biller.getBillerOrderId());
orderRequest.setOrder(order);
OrderReader osmReader = new OrderReader(orderUrl, userName, password, restTemplate, orderRequest);
OrderResponse orderResponse = osmReader.read();
if (orderResponse.getResult().equals("SUCCESS") ) {
return null;
} else {
//Failed transactions
return biller;
}
}
}
Для целей тестирования я сделал BillerOrderId в виде 4 цифр и сразу поднял его, но при изменении на 16, но я изменил значение на 16, но сразу изменился цифры, требуется время для выполнения обновленного 16 di git BillerOrderId. Работает после 4 или 5 попыток. Я пытался увидеть, сколько времени он собирает обновленные записи. Но я не видел никакой последовательности.
Спасибо, Бандита Прадхан