Чтение CSV-файла и вставка его в БД с использованием Spring Integration и Batch - PullRequest
0 голосов
/ 28 июня 2019

enter image description hereenter image description here У меня есть приложение, которое отслеживает FTP-папку для определенного файла CSV foo.csv, когда файл находится, он тянет его к моему локальному исгенерируйте новый формат вывода bar.csv, приложение затем отправит новый файл bar.csv обратно в папку FTP и сотрет его с локального компьютера.

Теперь я хочу представить процесс, который будет читать bar.csvи вставьте его в таблицу базы данных перед повторной отправкой на FTP-сервер.

Я предполагаю, что это можно сделать с помощью Spring Batch Integration, но не смог найти способ сделать это.

Нижемой код приложения для справки и предложения.

public IntegrationFlow localToFtpFlow(Branch myBranch) {

    return IntegrationFlows.from(Files.inboundAdapter(new File(myBranch.getBranchCode()))
                    .filter(new ChainFileListFilter<File>()
                            .addFilter(new RegexPatternFileListFilter("final" + myBranch.getBranchCode() + ".csv"))
                            .addFilter(new FileSystemPersistentAcceptOnceFileListFilter(metadataStore(dataSource), "foo"))),//FileSystemPersistentAcceptOnceFileListFilter
            e -> e.poller(Pollers.fixedDelay(10_000)))
            .enrichHeaders(h ->h.headerExpression("file_originalFile", "new java.io.File('"+ myBranch.getBranchCode() +"/FEFOexport" + myBranch.getBranchCode() + ".csv')",true))
            .transform(p -> {
                LOG.info("Sending file " + p + " to FTP branch " + myBranch.getBranchCode());
                return p;
            })

            .log()
            .transform(m -> {
                        this.defaultSessionFactoryLocator.addSessionFactory(myBranch.getBranchCode(),createNewFtpSessionFactory(myBranch));
                        LOG.info("Adding factory to delegation");
                        return m;
            })

            .publishSubscribeChannel(s ->
                    s.subscribe(f -> f.transform(fileMessageToJobRequest()).handle(jobLaunchingGateway()).channel("nullChannel"))
                    .subscribe(h -> h.handle(Ftp.outboundAdapter(createNewFtpSessionFactory(myBranch), FileExistsMode.REPLACE)
                                     .useTemporaryFileName(true)
                                     .autoCreateDirectory(false)
                                     .remoteDirectory(myBranch.getFolderPath()), e -> e.advice(expressionAdvice()))))
            /*.handle(Ftp.outboundAdapter(createNewFtpSessionFactory(myBranch), FileExistsMode.REPLACE)
                    .useTemporaryFileName(true)
                    .autoCreateDirectory(false)
                    .remoteDirectory(myBranch.getFolderPath()), e -> e.advice(expressionAdvice()))*/
            .get();
}

@Bean
public FileMessageToJobRequest fileMessageToJobRequest(){
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(orderJob);
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
    //simpleJobLauncher.setJobRepository(jobRepository);
    simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

    return jobLaunchingGateway;
}

Файловое сообщение для класса задания

public class FileMessageToJobRequest {

    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
                new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
                message.getPayload().getAbsolutePath());//message.getPayload().getAbsolutePath()

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}

Ошибка при добавлении потока во время выполнения.

2019-07-02 12:42:49.292  INFO 7476 --- [ask-scheduler-4] o.s.i.file.FileReadingMessageSource      : Created message: [GenericMessage [payload=BEY\finalBEY.csv, headers={file_originalFile=BEY\finalBEY.csv, id=38827020-c9c1-aa35-526c-cc6848ca5e11, file_name=finalBEY.csv, file_relativePath=finalBEY.csv, timestamp=1562060569292}]]
2019-07-02 12:42:49.292  INFO 7476 --- [ask-scheduler-4] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=BEY\finalBEY.csv, headers={file_originalFile=BEY\FEFOexportBEY.csv, id=abbe6169-7f46-fcb9-be9e-44533ab05ec8, file_name=finalBEY.csv, file_relativePath=finalBEY.csv, timestamp=1562060569292}]
2019-07-02 12:42:49.295 ERROR 7476 --- [ask-scheduler-4] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [jobLaunchingGateway]; nested exception is java.lang.NullPointerException, failedMessage=GenericMessage [payload=JobLaunchRequest: orderJob, parameters={input.file.name=C:\Java Programs\spring4ftpappftp\BEY\finalBEY.csv}, headers={file_originalFile=BEY\FEFOexportBEY.csv, id=f7b2560e-e435-009b-2b8e-27ef168a6767, file_name=finalBEY.csv, file_relativePath=finalBEY.csv, timestamp=1562060569293}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:184)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:175)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:220)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:277)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.lambda$run$0(AbstractPollingEndpoint.java:378)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:53)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:372)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:98)
    at org.springframework.batch.integration.launch.JobLaunchingMessageHandler.launch(JobLaunchingMessageHandler.java:50)
    at org.springframework.batch.integration.launch.JobLaunchingGateway.handleRequestMessage(JobLaunchingGateway.java:76)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    ... 100 more

1 Ответ

2 голосов
/ 28 июня 2019

Вы должны рассмотреть возможность использования publishSubscribeChannel() в вашем IntegtrationFlow определении. Поставьте в качестве одного подписчика свой .handle(Ftp.outboundAdapter()) (лучше вторым). И как первый это должно быть что-то вроде .handle(jobLaunchingGateway).channel("nullChannel").

Вы можете прочитать о JobLaunchingGateway в Spring Batch Справочное руководство .

Дело в том, что вы хотите отправить одно и то же сообщение в несколько мест. Итак, PublishSubscribeChannel - лучший путь: https://docs.spring.io/spring-integration/docs/5.1.6.RELEASE/reference/html/#java-dsl-subflows

Я предлагаю вам иметь этого подписчика именно в таком порядке, поскольку вы действительно хотите сохранить его в БД перед отправкой на FTP. Без executor второй подписчик будет ждать, пока первый не завершит свою работу.

То, что .channel("nullChannel") необходимо там, потому что JobLaunchingGateway действительно является шлюзом и возвращает JobExecution в качестве ответа. Поскольку вас это не интересует, вам просто нужно игнорировать. Конечно, у вас может быть еще один handle() после этого шлюза, чтобы каким-то образом обработать этот JobExecution. Дело в том, что не нужно ничего возвращать от первого подписчика в качестве ответа. Это как-то затормозит ваш основной поток.

UPDATE

Я думаю, .transform(fileMessageToJobRequest()) должен перейти к первому подписчику ниже, перед jobLaunchingGateway:

 .publishSubscribeChannel(s ->
                s.subscribe(f -> f.transform(fileMessageToJobRequest()).handle(jobLaunchingGateway()).channel("nullChannel"))
                .subscribe(h -> h.handle(Ftp.outboundAdapter(createNewFtpSessionFactory(myBranch), ...))

Дело в том, что вы хотели бы отправить тот же файл на следующий handle(), но после этого преобразователя в восходящем направлении он будет изменен на JobLaunchRequest, что хорошо для jobLaunchingGateway, но не для Ftp.outboundAdapter().

Ваше исключение:

Caused by: java.lang.NullPointerException
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:98)

приводит к этому коду: Assert.notNull(jobParameters, "The JobParameters must not be null.");. Итак, каким-то образом jobParametersBuilder.toJobParameters() оценивается как null.

Я не могу помочь вам с таким большим количеством пользовательского кода.

UPDATE2

OK. Похоже, ваша проблема в определении бина JobLaunchingGateway. Вы делаете явное new SimpleJobLauncher() без JobRepository инъекции.

Похоже, в BatchConfigurerConfiguration Spring Boot есть BasicBatchConfigurer, который можно вставить в этот JobLaunchingGateway. Поэтому мы передадим NPE. Потом есть другие ошибки, но это уже другая история ...

...