Вы должны рассмотреть возможность использования publishSubscribeChannel()
в вашем IntegtrationFlow
определении. Поставьте в качестве одного подписчика свой .handle(Ftp.outboundAdapter())
(лучше вторым). И как первый это должно быть что-то вроде .handle(jobLaunchingGateway).channel("nullChannel")
.
Вы можете прочитать о JobLaunchingGateway
в Spring Batch Справочное руководство .
Дело в том, что вы хотите отправить одно и то же сообщение в несколько мест. Итак, PublishSubscribeChannel
- лучший путь: https://docs.spring.io/spring-integration/docs/5.1.6.RELEASE/reference/html/#java-dsl-subflows
Я предлагаю вам иметь этого подписчика именно в таком порядке, поскольку вы действительно хотите сохранить его в БД перед отправкой на FTP. Без executor
второй подписчик будет ждать, пока первый не завершит свою работу.
То, что .channel("nullChannel")
необходимо там, потому что JobLaunchingGateway
действительно является шлюзом и возвращает JobExecution
в качестве ответа. Поскольку вас это не интересует, вам просто нужно игнорировать. Конечно, у вас может быть еще один handle()
после этого шлюза, чтобы каким-то образом обработать этот JobExecution
. Дело в том, что не нужно ничего возвращать от первого подписчика в качестве ответа. Это как-то затормозит ваш основной поток.
UPDATE
Я думаю, .transform(fileMessageToJobRequest())
должен перейти к первому подписчику ниже, перед jobLaunchingGateway
:
.publishSubscribeChannel(s ->
s.subscribe(f -> f.transform(fileMessageToJobRequest()).handle(jobLaunchingGateway()).channel("nullChannel"))
.subscribe(h -> h.handle(Ftp.outboundAdapter(createNewFtpSessionFactory(myBranch), ...))
Дело в том, что вы хотели бы отправить тот же файл на следующий handle()
, но после этого преобразователя в восходящем направлении он будет изменен на JobLaunchRequest
, что хорошо для jobLaunchingGateway
, но не для Ftp.outboundAdapter()
.
Ваше исключение:
Caused by: java.lang.NullPointerException
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:98)
приводит к этому коду: Assert.notNull(jobParameters, "The JobParameters must not be null.");
. Итак, каким-то образом jobParametersBuilder.toJobParameters()
оценивается как null
.
Я не могу помочь вам с таким большим количеством пользовательского кода.
UPDATE2
OK. Похоже, ваша проблема в определении бина JobLaunchingGateway
. Вы делаете явное new SimpleJobLauncher()
без JobRepository
инъекции.
Похоже, в BatchConfigurerConfiguration
Spring Boot есть BasicBatchConfigurer
, который можно вставить в этот JobLaunchingGateway
. Поэтому мы передадим NPE. Потом есть другие ошибки, но это уже другая история ...