Spring Integration + Spring Batch: работа не останавливается - PullRequest
0 голосов
/ 04 марта 2020

Я хочу прочитать файл с ftp-сервера, затем сохранить его в локальном хранилище и удалить с сервера, запустить задание, которое читает файл, найти одну запись в БД, изменить один параметр и сохранить его.

Что идет не так: работа не заканчивается; увеличивает зарплату и экономит сотруднику много раз.

Конфигурация Spring Integration:

    @Bean
    public FtpInboundFileSynchronizer ftpInboundFileSynchronizer(DefaultFtpSessionFactory sessionFactory) {
        FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(sessionFactory);
        fileSynchronizer.setRemoteDirectory(remoteDirectory);
        fileSynchronizer.setDeleteRemoteFiles(true);
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(cron = "*/5 * * * * ?"))
    public FtpInboundFileSynchronizingMessageSource ftpInboundFileSynchronizingMessageSource(FtpInboundFileSynchronizer fileSynchronizer) throws Exception {
        FtpInboundFileSynchronizingMessageSource messageSource = new FtpInboundFileSynchronizingMessageSource(fileSynchronizer);
        messageSource.setAutoCreateLocalDirectory(true);
        messageSource.setLocalDirectory(new File(localDirectory));
        messageSource.setLocalFilter(new AcceptOnceFileListFilter<>());
        return messageSource;
    }

    @Bean
    @ServiceActivator(inputChannel = "fileInputChannel")
    public FileWritingMessageHandler fileWritingMessageHandler() {
        FileWritingMessageHandler messageHandler = new FileWritingMessageHandler(new File(localDirectory));
        messageHandler.setOutputChannelName("jobLaunchRequestChannel");
        return messageHandler;
    }

    @ServiceActivator(inputChannel = "jobLaunchRequestChannel", outputChannel = "jobLaunchingGatewayChannel")
    public JobLaunchRequest jobLaunchRequest(File file) throws IOException {
        String[] content = FileUtils.readFileToString(file, "UTF-8").split("\\s+");
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("filename", file.getAbsolutePath())
                .addString("id", content[0]).addString("salary", content[1])
//                .addLong("time", System.currentTimeMillis())
                .toJobParameters();
        return new JobLaunchRequest(increaseSalaryJob, jobParameters);
    }

    @Bean
    @ServiceActivator(inputChannel = "jobLaunchingGatewayChannel")
    public JobLaunchingGateway jobLaunchingGateway(SimpleJobLauncher jobLauncher) {
        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
        jobLaunchingGateway.setOutputChannelName("finish");
        return jobLaunchingGateway;
    }

    @ServiceActivator(inputChannel = "finish")
    public void finish() {
        System.out.println("FINISH");
    }
}

Конфигурация Spring Batch:

 @Bean
    public Job increaseSalaryJob(CustomJobListener listener, Step step1) {
        return jobBuilderFactory.get("increaseSalaryJob")
                .preventRestart()
                .listener(listener)
                .start(step1)
                .build();
    }

    @Bean
    public Step step1(ItemReader<Employee> reader) {
        return stepBuilderFactory.get("step1")
                .transactionManager(transactionManager)
                .<Employee, Employee> chunk(1)
                .reader(reader)
                .processor(processor())
                .writer(writer())
                .build();
    }

    @Bean
    @StepScope
    public ItemReader<Employee> reader(@Value("#{jobParameters[id]}") Integer id) {
        log.info("reader");
        return () -> employeeService.get(id);
    }

    @Bean
    @StepScope
    public ItemProcessor<Employee, Employee> processor() {
        log.info("processor");
        return employee -> {
            log.info(employee.getName() + " had salary " + employee.getSalary());
            Integer salary = employee.getSalary() + 1;
            employee.setSalary(salary);
            log.info(employee.getName() + " have salary " + employee.getSalary());

            return employee;
        };
    }

    @Bean
    @StepScope
    public ItemWriter<Employee> writer() {
        log.info("writer");
        return employees -> {
            for (Employee employee : employees) {
                try {
                    employeeService.update(employee);
                    log.info(employee.getName() + " updated with salary " + employee.getSalary());
                } catch (ValidationException e) {
                    e.printStackTrace();
                }
            }
        };
    }

    @Bean
    public MapJobRepositoryFactoryBean jobRepositoryFactoryBean(PlatformTransactionManager transactionManager) {
        return new MapJobRepositoryFactoryBean(transactionManager);
    }

    @Bean
    public JobRepository jobRepository(MapJobRepositoryFactoryBean jobRepositoryFactoryBean) throws Exception {
        jobRepositoryFactoryBean.setTransactionManager(transactionManager);
        return jobRepositoryFactoryBean.getObject();
    }

    @Bean
    public SimpleJobLauncher jobLauncher(JobRepository jobRepository) {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        return jobLauncher;
    }

Буду рад любой помощи.

1 Ответ

0 голосов
/ 06 марта 2020

Вы должны убедиться, что ваш читатель возвращает null в какой-то момент. Вот как этот шаг интерпретирует, что больше нет данных для обработки и выхода (что, в свою очередь, остановит окружающее задание, если больше нет шагов для выполнения).

Тем не менее, я вижу ввод ваших ориентированный на чанк шаг - единственный id. Для этого варианта использования достаточно простого тасклета, нет необходимости в тасклете, ориентированном на чанк, с одной входной записью и chunkSize=1.

...