При использовании RemoteFileTemplate для получения 20 КБ файлов с FTP всегда 15 или 20 файлов застревали и через 10 минут FTP возвращает код ошибки:
- FTP-ответ 421 получен.Соединение с сервером закрыто.
- Не удалось получить InputStream для удаленного файла / test / test_file_1245: 425
Конфигурация FTP-сервера:
- MaxInstances 2000 (Ограничиваетобщее количество подключений)
- MaxClients 1000 (ограничивает количество подключений для каждого сервера / хоста)
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.2.RELEASE</version>
</parent>
<groupId>rabbitmq.listener</groupId>
<artifactId>listener</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ftp</artifactId>
</dependency>
</dependencies>
</project>
приложение.properties
spring.rabbitmq.listener.type=direct
spring.rabbitmq.listener.direct.default-requeue-rejected=false
spring.rabbitmq.listener.direct.consumers-per-queue=5
spring.rabbitmq.listener.direct.prefetch=5
DemoConfiguration.java
@Configuration
@Slf4j
@EnableRabbit
public class DemoConfiguration {
@Bean
public DirectMessageListenerContainer container(final DirectRabbitListenerContainerFactory containerFactory) {
final DirectMessageListenerContainer listenerContainer = containerFactory.createListenerContainer();
listenerContainer.setQueueNames("in_queue"); // has 20.000 messages before starting this application
listenerContainer.setListenerId("listener_in_queue");
return listenerContainer;
}
@Bean
public SessionFactory<FTPFile> sessionFactory() {
final DefaultFtpSessionFactory defaultFtpSessionFactory = new DefaultFtpSessionFactory();
defaultFtpSessionFactory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
defaultFtpSessionFactory.setHost("ftp.test.com");
defaultFtpSessionFactory.setPort(1021);
defaultFtpSessionFactory.setUsername("test");
defaultFtpSessionFactory.setPassword("test");
return new CachingSessionFactory<>(defaultFtpSessionFactory);
}
@Bean
public RemoteFileTemplate<FTPFile> ftpFileRemoteFileTemplate(final SessionFactory<FTPFile> sessionFactory) {
return new RemoteFileTemplate<>(sessionFactory);
}
@Bean
public IntegrationFlow demoFlow(final DirectMessageListenerContainer container, final RemoteFileTemplate<FTPFile> ftpFileRemoteFileTemplate) {
return IntegrationFlows.from(Amqp.inboundAdapter(container))
.handle((payload, headers) -> {
final String file = payload.toString();
ftpFileRemoteFileTemplate.get(file, stream -> {
try {
log.info("{}: {}", file, IOUtils.toString(stream, StandardCharsets.UTF_8).length());
} finally {
stream.close();
}
});
return null;
})
.get();
}
}
файл журнала
2019-01-28 13:37:32.208 WARN 2688 --- [pool-1-thread-14 ] org.springframework.integration.ftp.session.FtpSession : failed to disconnect FTPClient
org.apache.commons.net.ftp.FTPConnectionClosedException: FTP response 421 received. Server closed connection.
at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:388)
at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:300)
at org.apache.commons.net.ftp.FTP.getReply(FTP.java:732)
at org.apache.commons.net.ftp.FTPClient.completePendingCommand(FTPClient.java:1853)
at org.springframework.integration.ftp.session.FtpSession.finalizeRaw(FtpSession.java:108)
at org.springframework.integration.ftp.session.FtpSession.close(FtpSession.java:150)
at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.close(CachingSessionFactory.java:208)
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:456)
at org.springframework.integration.file.remote.RemoteFileTemplate.get(RemoteFileTemplate.java:393)
at rabbitmq.listener.DemoConfiguration.lambda$demoFlow$1(DemoConfiguration.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:102)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:57)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:237)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:204)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372)
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995)
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-01-28 13:37:32.208 WARN 2688 --- [pool-1-thread-14 ] org.springframework.integration.ftp.session.FtpSession : failed to disconnect FTPClient
org.apache.commons.net.ftp.FTPConnectionClosedException: FTP response 421 received. Server closed connection.
at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:388)
at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:300)
at org.apache.commons.net.ftp.FTP.getReply(FTP.java:732)
at org.apache.commons.net.ftp.FTPClient.completePendingCommand(FTPClient.java:1853)
at org.springframework.integration.ftp.session.FtpSession.finalizeRaw(FtpSession.java:108)
at org.springframework.integration.ftp.session.FtpSession.close(FtpSession.java:150)
at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.close(CachingSessionFactory.java:208)
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:456)
at org.springframework.integration.file.remote.RemoteFileTemplate.get(RemoteFileTemplate.java:393)
at rabbitmq.listener.DemoConfiguration.lambda$demoFlow$1(DemoConfiguration.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:102)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:57)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:237)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:204)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372)
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995)
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-01-28 13:37:32.212 WARN 2688 --- [pool-1-thread-14 ] org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1613)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1517)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372)
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995)
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:111)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:57)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:237)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:204)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514)
... 11 common frames omitted
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:451)
at org.springframework.integration.file.remote.RemoteFileTemplate.get(RemoteFileTemplate.java:393)
at rabbitmq.listener.DemoConfiguration.lambda$demoFlow$1(DemoConfiguration.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:102)
... 29 common frames omitted
Caused by: java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
at org.springframework.integration.ftp.session.FtpSession.readRaw(FtpSession.java:98)
at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.readRaw(CachingSessionFactory.java:280)
at org.springframework.integration.file.remote.RemoteFileTemplate.lambda$get$4(RemoteFileTemplate.java:396)
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:442)
... 36 common frames omitted
2019-01-28 13:37:32.215 ERROR 2688 --- [pool-1-thread-14 ] org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer : Failed to invoke listener
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1613)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1517)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372)
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995)
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:111)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:57)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:237)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:204)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514)
... 11 common frames omitted
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:451)
at org.springframework.integration.file.remote.RemoteFileTemplate.get(RemoteFileTemplate.java:393)
at rabbitmq.listener.DemoConfiguration.lambda$demoFlow$1(DemoConfiguration.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:102)
... 29 common frames omitted
Caused by: java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
at org.springframework.integration.ftp.session.FtpSession.readRaw(FtpSession.java:98)
at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.readRaw(CachingSessionFactory.java:280)
at org.springframework.integration.file.remote.RemoteFileTemplate.lambda$get$4(RemoteFileTemplate.java:396)
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:442)
... 36 common frames omitted
- РЕДАКТИРОВАТЬ
Я заменил CachingSessionFactory
на DefaultFtpSessionFactory
, чтобы избежать проблем с пулами и получить такое же поведение.10 сообщений находились в состоянии unacked
на RabbitMQ в течение 10 минут.Через 10 минут я увидел, что файлы были обработаны, и на RabbitMQ больше не было сообщений.Я мог видеть, что на ftp-сервере один сеанс находился в состоянии IDLE в течение 10 минут.
@Bean
public SessionFactory<FTPFile> sessionFactory() {
final DefaultFtpSessionFactory defaultFtpSessionFactory = new DefaultFtpSessionFactory();
defaultFtpSessionFactory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
defaultFtpSessionFactory.setHost("ftp.test.com");
defaultFtpSessionFactory.setPort(1021);
defaultFtpSessionFactory.setUsername("test");
defaultFtpSessionFactory.setPassword("test");
return defaultFtpSessionFactory;
}
Мне это не кажется проблемой пула или FTP 421, скорее всего это что-то в DefaultFtpSessionFactory
илисам сеанс?Почему эти темы так долго висят?