Я хочу прочитать файл с ftp-сервера, затем сохранить его в локальном хранилище и удалить с сервера, запустить задание, которое читает файл, найти одну запись в БД, изменить один параметр и сохранить его.
Что идет не так: работа не заканчивается; увеличивает зарплату и экономит сотруднику много раз.
Конфигурация Spring Integration:
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer(DefaultFtpSessionFactory sessionFactory) {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(sessionFactory);
fileSynchronizer.setRemoteDirectory(remoteDirectory);
fileSynchronizer.setDeleteRemoteFiles(true);
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(cron = "*/5 * * * * ?"))
public FtpInboundFileSynchronizingMessageSource ftpInboundFileSynchronizingMessageSource(FtpInboundFileSynchronizer fileSynchronizer) throws Exception {
FtpInboundFileSynchronizingMessageSource messageSource = new FtpInboundFileSynchronizingMessageSource(fileSynchronizer);
messageSource.setAutoCreateLocalDirectory(true);
messageSource.setLocalDirectory(new File(localDirectory));
messageSource.setLocalFilter(new AcceptOnceFileListFilter<>());
return messageSource;
}
@Bean
@ServiceActivator(inputChannel = "fileInputChannel")
public FileWritingMessageHandler fileWritingMessageHandler() {
FileWritingMessageHandler messageHandler = new FileWritingMessageHandler(new File(localDirectory));
messageHandler.setOutputChannelName("jobLaunchRequestChannel");
return messageHandler;
}
@ServiceActivator(inputChannel = "jobLaunchRequestChannel", outputChannel = "jobLaunchingGatewayChannel")
public JobLaunchRequest jobLaunchRequest(File file) throws IOException {
String[] content = FileUtils.readFileToString(file, "UTF-8").split("\\s+");
JobParameters jobParameters = new JobParametersBuilder()
.addString("filename", file.getAbsolutePath())
.addString("id", content[0]).addString("salary", content[1])
// .addLong("time", System.currentTimeMillis())
.toJobParameters();
return new JobLaunchRequest(increaseSalaryJob, jobParameters);
}
@Bean
@ServiceActivator(inputChannel = "jobLaunchingGatewayChannel")
public JobLaunchingGateway jobLaunchingGateway(SimpleJobLauncher jobLauncher) {
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
jobLaunchingGateway.setOutputChannelName("finish");
return jobLaunchingGateway;
}
@ServiceActivator(inputChannel = "finish")
public void finish() {
System.out.println("FINISH");
}
}
Конфигурация Spring Batch:
@Bean
public Job increaseSalaryJob(CustomJobListener listener, Step step1) {
return jobBuilderFactory.get("increaseSalaryJob")
.preventRestart()
.listener(listener)
.start(step1)
.build();
}
@Bean
public Step step1(ItemReader<Employee> reader) {
return stepBuilderFactory.get("step1")
.transactionManager(transactionManager)
.<Employee, Employee> chunk(1)
.reader(reader)
.processor(processor())
.writer(writer())
.build();
}
@Bean
@StepScope
public ItemReader<Employee> reader(@Value("#{jobParameters[id]}") Integer id) {
log.info("reader");
return () -> employeeService.get(id);
}
@Bean
@StepScope
public ItemProcessor<Employee, Employee> processor() {
log.info("processor");
return employee -> {
log.info(employee.getName() + " had salary " + employee.getSalary());
Integer salary = employee.getSalary() + 1;
employee.setSalary(salary);
log.info(employee.getName() + " have salary " + employee.getSalary());
return employee;
};
}
@Bean
@StepScope
public ItemWriter<Employee> writer() {
log.info("writer");
return employees -> {
for (Employee employee : employees) {
try {
employeeService.update(employee);
log.info(employee.getName() + " updated with salary " + employee.getSalary());
} catch (ValidationException e) {
e.printStackTrace();
}
}
};
}
@Bean
public MapJobRepositoryFactoryBean jobRepositoryFactoryBean(PlatformTransactionManager transactionManager) {
return new MapJobRepositoryFactoryBean(transactionManager);
}
@Bean
public JobRepository jobRepository(MapJobRepositoryFactoryBean jobRepositoryFactoryBean) throws Exception {
jobRepositoryFactoryBean.setTransactionManager(transactionManager);
return jobRepositoryFactoryBean.getObject();
}
@Bean
public SimpleJobLauncher jobLauncher(JobRepository jobRepository) {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
return jobLauncher;
}
Буду рад любой помощи.