Возможная проблема с каналом «Публикация подписки» - PullRequest
1 голос
/ 11 июля 2019

У меня есть приложение, которое отслеживает FTP-папку для определенного файла csv foo.csv, после того, как файл найден, он тянет его к моему локальному и генерирует новый выходной формат bar.csv, приложение затем отправит новый файл finalBEY.csv вернуться в папку FTP и стереть ее из локальной сети.

Теперь, когда я представил процесс, используя publishSubscribeChannel, он преобразует файл в сообщение, а затем использует jobLaunchingGateway, который будет читать finalBEY.csv, используяпакетировать и распечатать его на консоль, он не работает, так как finalBEY.csv удаляется из локального forlder после отправки его обратно на FTP, я использую .channel("nullChannel") на jobLaunchingGateway в первом subscribe, которыйпредположим, что он удерживается до получения ответа от пакета, а затем переходит к следующему subscribe, который отправит его на ftp и удалит его из локального, но, похоже, это не тот случай, когда он удаляет его из локального и, таким образом,Пакет не находит finalBEY.csv и выдает ошибку, которую я вставляю ниже с кодом.

Если я удаляю совет из второго subscribe, он работает fКроме того, это больше не удалит его из локальной сети.

Не могли бы вы помочь в этом вопросе?

public IntegrationFlow localToFtpFlow(Branch myBranch) {

        return IntegrationFlows.from(Files.inboundAdapter(new File(myBranch.getBranchCode()))
                        .filter(new ChainFileListFilter<File>()
                                .addFilter(new RegexPatternFileListFilter("final" + myBranch.getBranchCode() + ".csv"))
                                .addFilter(new FileSystemPersistentAcceptOnceFileListFilter(metadataStore(dataSource), "foo"))),//FileSystemPersistentAcceptOnceFileListFilter
                e -> e.poller(Pollers.fixedDelay(10_000)))
                .enrichHeaders(h ->h.headerExpression("file_originalFile", "new java.io.File('"+ myBranch.getBranchCode() +"/FEFOexport" + myBranch.getBranchCode() + ".csv')",true))
                .transform(p -> {
                    LOG.info("Sending file " + p + " to FTP branch " + myBranch.getBranchCode());
                    return p;
                })

                .log()
                .transform(m -> {
                            this.defaultSessionFactoryLocator.addSessionFactory(myBranch.getBranchCode(),createNewFtpSessionFactory(myBranch));
                            LOG.info("Adding factory to delegation");
                            return m;
                })
                .publishSubscribeChannel(s ->
                        s.subscribe(f ->f.transform(fileMessageToJobRequest())
                                        .handle(jobLaunchingGateway()).channel("nullChannel"))
                        .subscribe(h -> h.handle(Ftp.outboundAdapter(createNewFtpSessionFactory(myBranch), FileExistsMode.REPLACE)
                                         .useTemporaryFileName(true)
                                         .autoCreateDirectory(false)
                                         .remoteDirectory(myBranch.getFolderPath()), e -> e.advice(expressionAdvice()))))

                .get();
    }

    @Bean
    public FileMessageToJobRequest fileMessageToJobRequest(){
        FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
        fileMessageToJobRequest.setFileParameterName("file_path");
        fileMessageToJobRequest.setJob(orderJob);
        return fileMessageToJobRequest;
    }

    @Bean
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

        return jobLaunchingGateway;
    }

    /**
    * Creating the advice for routing the payload of the outbound message on different expressions (success, failure)
    * @return Advice
    */

    @Bean
    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("success.input");
        advice.setOnSuccessExpressionString("payload.delete() + ' was successful'");
        //advice.setOnSuccessExpressionString("inputMessage.headers['file_originalFile'].renameTo(new java.io.File(payload.absolutePath + '.success.to.send'))");
        //advice.setFailureChannelName("failure.input");
        advice.setOnFailureExpressionString("payload + ' was bad, with reason: ' + #exception.cause.message");
        advice.setTrapException(true);
        return advice;
    }

Вот ошибка, и, как вы видите, первые строки показывают, что этоперешел на FTP и затем инициировал пакетное задание, в то время как при подписке следует сначала выполнить пакетное задание ...

 INFO 10452 --- [ask-scheduler-2] o.s.integration.ftp.session.FtpSession   : File has been successfully renamed from: /ftp/erbranch/EDMS/FEFO/finalBEY.csv.writing to /ftp/erbranch/EDMS/FEFO/finalBEY.csv

Caused by: java.lang.IllegalStateException: Input resource must exist (reader is in 'strict' mode): file [C:\Java Programs\spring4ftpappftp\BEY\finalBEY.csv]
    at org.springframework.batch.item.file.FlatFileItemReader.doOpen(FlatFileItemReader.java:251) ~[spring-batch-infrastructure-4.0.1.RELEASE.jar:4.0.1.RELEASE]
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:146) ~[spring-batch-infrastructure-4.0.1.RELEASE.jar:4.0.1.RELEASE]
    ... 123 common frames omitted

Код отладки:

2019-07-15 10:43:02.838  INFO 4280 --- [ask-scheduler-2] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /ftp/erbranch/EDMS/FEFO/finalBEY.csv.writing
2019-07-15 10:43:02.845  INFO 4280 --- [ask-scheduler-2] o.s.integration.ftp.session.FtpSession   : File has been successfully renamed from: /ftp/erbranch/EDMS/FEFO/finalBEY.csv.writing to /ftp/erbranch/EDMS/FEFO/finalBEY.csv
2019-07-15 10:43:02.845 DEBUG 4280 --- [ask-scheduler-2] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'integrationEvaluationContext'
2019-07-15 10:43:02.848 DEBUG 4280 --- [ask-scheduler-2] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'success.input'
2019-07-15 10:43:02.849 DEBUG 4280 --- [ask-scheduler-2] o.s.integration.channel.DirectChannel    : preSend on channel 'success.input', message: AdviceMessage [payload=true was successful, headers={id=eca55e1d-918e-3334-afce-66f8ab650748, timestamp=1563176582848}, inputMessage=GenericMessage [payload=BEY\finalBEY.csv, headers={file_originalFile=BEY\FEFOexportBEY.csv, id=a2f029b0-2609-1a11-67ef-4f56c7dd0752, file_name=finalBEY.csv, file_relativePath=finalBEY.csv, timestamp=1563176582787}]]
2019-07-15 10:43:02.849 DEBUG 4280 --- [ask-scheduler-2] o.s.i.t.MessageTransformingHandler       : success.org.springframework.integration.transformer.MessageTransformingHandler#0 received message: AdviceMessage [payload=true was successful, headers={id=eca55e1d-918e-3334-afce-66f8ab650748, timestamp=1563176582848}, 

inputMessage=GenericMessage [payload=BEY\finalBEY.csv, headers={file_originalFile=BEY\FEFOexportBEY.csv, id=a2f029b0-2609-1a11-67ef-4f56c7dd0752, file_name=finalBEY.csv, file_relativePath=finalBEY.csv, timestamp=1563176582787}]]

    2019-07-15 10:43:02.951 DEBUG 4280 --- [ask-scheduler-2] o.s.b.i.launch.JobLaunchingGateway       : jobLaunchingGateway received message: GenericMessage [payload=JobLaunchRequest: orderJob, parameters={file_path=C:\Java Programs\spring4ftpappftp\BEY\finalBEY.csv, dummy=1563176582946}, headers={file_originalFile=BEY\FEFOexportBEY.csv, id=c98ad6cb-cced-c911-1b93-9d054baeb9d0, file_name=finalBEY.csv, file_relativePath=finalBEY.csv, timestamp=1563176582951}]

2019-07-16 08:35:29.442  INFO 10208 --- [nio-8081-exec-3] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.FTPOutp' has 1 subscriber(s).
2019-07-16 08:35:29.442  INFO 10208 --- [nio-8081-exec-3] o.s.i.endpoint.EventDrivenConsumer       : started 1o.org.springframework.integration.config.ConsumerEndpointFactoryBean#3
2019-07-16 08:35:29.442  INFO 10208 --- [nio-8081-exec-3] o.s.i.endpoint.EventDrivenConsumer       : Adding {message-handler} as a subscriber to the '1o.subFlow#1.channel#0' channel
2019-07-16 08:35:29.442  INFO 10208 --- [nio-8081-exec-3] o.s.integration.channel.DirectChannel    : Channel 'application.1o.subFlow#1.channel#0' has 1 subscriber(s).
2019-07-16 08:35:29.442  INFO 10208 --- [nio-8081-exec-3] o.s.i.endpoint.EventDrivenConsumer       : started 1o.subFlow#1.org.springframework.integration.config.ConsumerEndpointFactoryBean#1
2019-07-16 08:35:29.442  INFO 10208 --- [nio-8081-exec-3] o.s.i.endpoint.EventDrivenConsumer       : Adding {bridge} as a subscriber to the 'FTPOutp' channel
2019-07-16 08:35:29.442  INFO 10208 --- [nio-8081-exec-3] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.FTPOutp' has 2 subscriber(s).

1 Ответ

0 голосов
/ 11 июля 2019

Поскольку вы используете SyncTaskExecutor, пакетное задание должно выполняться в вызывающем потоке, а затем за ним следует FTP-адаптер.

Используйте протоколирование отладки и следите за потоком сообщений, чтобы понять, почему этого не происходит.

...