Я новичок в Spring и ищу решение для пакетной пружины для простого процесса ETL. Я перешел по следующей ссылке: https://examples.javacodegeeks.com/enterprise-java/spring/batch/spring-batch-etl-job-example/ для обучения.
Это работает хорошо, и у меня появилась идея базовой структуры c использования подпружиненной партии для ETL.
Далее я хотел получить файл с FTP-сервера как Po C. По сути, вместо использования жестко заданного входного файла я хочу добавить выборку с FTP.
При поиске я понимаю, что мы можем использовать пружинную интеграцию для FTP / SFTP.
Я следовал следующая ссылка: https://coreyreil.wordpress.com/2012/12/21/spring-batch-creating-an-ftp-tasklet-to-get-remote-files/ для создания тасклета, который будет выполнять выборку по FTP / SFTP.
В существующей конфигурации BatchConfiguration. java Я добавил следующее, чтобы добавить свою задачу FTP в качестве самого первого шага.
@Bean
public FtpGetRemoteFilesTasklet myFtpGetRemoteFilesTasklet()
{
FtpGetRemoteFilesTasklet ftpTasklet = new FtpGetRemoteFilesTasklet();
ftpTasklet.setRetryIfNotFound(true);
ftpTasklet.setDownloadFileAttempts(3);
ftpTasklet.setRetryIntervalMilliseconds(10000);
ftpTasklet.setFileNamePattern("README");
//ftpTasklet.setFileNamePattern("TestFile");
ftpTasklet.setRemoteDirectory("/");
ftpTasklet.setLocalDirectory(new File(System.getProperty("java.io.tmpdir")));
ftpTasklet.setSessionFactory(myFtpSessionFactory);
return ftpTasklet;
}
@Bean
public SessionFactory myFtpSessionFactory()
{
DefaultFtpSessionFactory ftpSessionFactory = new DefaultFtpSessionFactory();
ftpSessionFactory.setHost("ftp.gnu.org");
ftpSessionFactory.setClientMode(0);
ftpSessionFactory.setFileType(0);
ftpSessionFactory.setPort(21);
ftpSessionFactory.setUsername("anonymous");
ftpSessionFactory.setPassword("anonymous");
return ftpSessionFactory;
}
И затем изменил для Jobbuilder следующее:
// Configure job step
@Bean
public Job marketPricesETLJob() {
return jobBuilderFactory.get("Market Prices ETL Job")
.incrementer(new RunIdIncrementer())
.listener(listener())
//.flow(etlStep()).end().build();
.start(getFilesFromFTPServer()).on("FAILED").end()
.from(getFilesFromFTPServer()).on("COMPLETED").to(etlStep())
.end()
.build();
}
@Bean
public Step getFilesFromFTPServer() {
return stepBuilderFactory.get("Get File from FTP Server")
.tasklet(myFtpGetRemoteFilesTasklet())
.build();
}
@Bean
public Step etlStep() {
return stepBuilderFactory.get("Extract -> Transform -> Aggregate -> Load").<MarketEvent, Trade> chunk(10000)
.reader(marketEventReader())
.processor(marketEventProcessor())
.writer(stockPriceAggregator())
.build();
}
В основном, используя сайт ftp.gnu.org для чтения файла README, например.
Но я получаю следующую ошибку.
2020-04-28 19:10:19.269 INFO 20380 --- [ main] c.s.demo.SpringBatchDemoApplication : Started SpringBatchDemoApplication in 16.846 seconds (JVM running for 18.903)
2020-04-28 19:10:19.274 INFO 20380 --- [ main] o.s.b.a.b.JobLauncherCommandLineRunner : Running default command line with: []
2020-04-28 19:10:19.748 INFO 20380 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=Market Prices ETL Job]] launched with the following parameters: [{run.id=8}]
2020-04-28 19:10:19.884 INFO 20380 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [Get File from FTP Server]
2020-04-28 19:11:40.046 WARN 20380 --- [l-1 housekeeper] com.zaxxer.hikari.pool.HikariPool : HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=49s792ms777µs228ns).
2020-04-28 19:12:13.603 ERROR 20380 --- [ main] o.s.batch.core.step.AbstractStep : Encountered an error executing step Get File from FTP Server in job Market Prices ETL Job
org.springframework.messaging.MessagingException: Problem occurred while synchronizing 'null' to local directory; nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.lang.IllegalStateException: failed to create FTPClient
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:355) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:333) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at com.springbatch.demo.tasklet.FtpGetRemoteFilesTasklet.execute(FtpGetRemoteFilesTasklet.java:112) ~[classes/:na]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:68) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:137) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at com.sun.proxy.$Proxy62.run(Unknown Source) ~[na:na]
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:192) ~[spring-boot-autoconfigure-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.executeLocalJobs(JobLauncherCommandLineRunner.java:166) ~[spring-boot-autoconfigure-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.launchJobFromProperties(JobLauncherCommandLineRunner.java:153) ~[spring-boot-autoconfigure-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.run(JobLauncherCommandLineRunner.java:148) ~[spring-boot-autoconfigure-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at com.springbatch.demo.SpringBatchDemoApplication.main(SpringBatchDemoApplication.java:10) ~[classes/:na]
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.lang.IllegalStateException: failed to create FTPClient
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:446) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:348) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
... 43 common frames omitted
Caused by: java.lang.IllegalStateException: failed to create FTPClient
at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.getSession(AbstractFtpSessionFactory.java:170) ~[spring-integration-ftp-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.getSession(AbstractFtpSessionFactory.java:41) ~[spring-integration-ftp-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:432) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
... 44 common frames omitted
Caused by: java.net.ConnectException: Connection timed out: connect
at java.base/sun.nio.ch.Net.connect0(Native Method) ~[na:na]
at java.base/sun.nio.ch.Net.connect(Net.java:493) ~[na:na]
at java.base/sun.nio.ch.Net.connect(Net.java:482) ~[na:na]
at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:588) ~[na:na]
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:339) ~[na:na]
at java.base/java.net.Socket.connect(Socket.java:603) ~[na:na]
at org.apache.commons.net.SocketClient._connect(SocketClient.java:243) ~[commons-net-3.6.jar:3.6]
at org.apache.commons.net.SocketClient.connect(SocketClient.java:202) ~[commons-net-3.6.jar:3.6]
at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.createClient(AbstractFtpSessionFactory.java:193) ~[spring-integration-ftp-5.2.5.RELEASE.jar:5.2.5.RELEASE]
Основная ошибка:
Caused by: java.lang.IllegalStateException: failed to create FTPClient
at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.getSession(AbstractFtpSessionFactory.java:170) ~[spring-integration-ftp-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.getSession(AbstractFtpSessionFactory.java:41) ~[spring-integration-ftp-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:432) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
... 44 common frames omitted
Caused by: java.net.ConnectException: Connection timed out: connect
Я могу подключиться к сайту с моей машины. Также скачайте файл. Как я могу проверить, в чем проблема? Как мне решить эту проблему?
По сути, я ищу хороший пример для процесса ETL, который будет включать следующие основные задачи: 1055 *: 1. Получить файл с удаленного хоста (FTP / SFTP) на локальный хост , 2. Считайте / проанализируйте загруженный файл. 3. Обработать файл (некоторые бизнес логи c). Можно прочитать несколько таких загруженных файлов, объединить их (например, работать с несколькими фреймами данных). 4. Писатель для хранения обработанных результатов в БД (в основном) / любом другом источнике.
. Выше я расскажу о некоторых примеры, чтобы понять, как пружинная партия работает с некоторыми рабочими примерами.
---- Обновление на основе приведенного ниже предложения ----
Да, на основе https://docs.spring.io/spring-integration/api/org/springframework/integration/ftp/session/AbstractFtpSessionFactory.html#setClientMode -int- ; Я пробовал и 0 и 2.
Я проверяю ftpSessionFactory, мне хорошо смотрится на основании того, что я установил.