Spring Integration Ftp remoteFileTemplate зависает при получении файла с ftp - PullRequest
0 голосов
/ 28 января 2019

При использовании RemoteFileTemplate для получения 20 КБ файлов с FTP всегда 15 или 20 файлов застревали и через 10 минут FTP возвращает код ошибки:

  • FTP-ответ 421 получен.Соединение с сервером закрыто.
  • Не удалось получить InputStream для удаленного файла / test / test_file_1245: 425

Конфигурация FTP-сервера:

  • MaxInstances 2000 (Ограничиваетобщее количество подключений)
  • MaxClients 1000 (ограничивает количество подключений для каждого сервера / хоста)

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.2.RELEASE</version>
    </parent>

    <groupId>rabbitmq.listener</groupId>
    <artifactId>listener</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-ftp</artifactId>
        </dependency>
    </dependencies>

</project>

приложение.properties

spring.rabbitmq.listener.type=direct
spring.rabbitmq.listener.direct.default-requeue-rejected=false
spring.rabbitmq.listener.direct.consumers-per-queue=5
spring.rabbitmq.listener.direct.prefetch=5

DemoConfiguration.java

@Configuration
@Slf4j
@EnableRabbit
public class DemoConfiguration {

    @Bean
    public DirectMessageListenerContainer container(final DirectRabbitListenerContainerFactory containerFactory) {
        final DirectMessageListenerContainer listenerContainer = containerFactory.createListenerContainer();
        listenerContainer.setQueueNames("in_queue"); // has 20.000 messages before starting this application
        listenerContainer.setListenerId("listener_in_queue");
        return listenerContainer;
    }

    @Bean
    public SessionFactory<FTPFile> sessionFactory() {
        final DefaultFtpSessionFactory defaultFtpSessionFactory = new DefaultFtpSessionFactory();
        defaultFtpSessionFactory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
        defaultFtpSessionFactory.setHost("ftp.test.com");
        defaultFtpSessionFactory.setPort(1021);
        defaultFtpSessionFactory.setUsername("test");
        defaultFtpSessionFactory.setPassword("test");
        return new CachingSessionFactory<>(defaultFtpSessionFactory);
    }

    @Bean
    public RemoteFileTemplate<FTPFile> ftpFileRemoteFileTemplate(final SessionFactory<FTPFile> sessionFactory) {
        return new RemoteFileTemplate<>(sessionFactory);
    }

    @Bean
    public IntegrationFlow demoFlow(final DirectMessageListenerContainer container, final RemoteFileTemplate<FTPFile> ftpFileRemoteFileTemplate) {

        return IntegrationFlows.from(Amqp.inboundAdapter(container))
                .handle((payload, headers) -> {
                    final String file = payload.toString();
                    ftpFileRemoteFileTemplate.get(file, stream -> {
                        try {
                            log.info("{}: {}", file, IOUtils.toString(stream, StandardCharsets.UTF_8).length());
                        } finally {
                            stream.close();
                        }
                    });
                    return null;
                })
                .get();
    }

}

файл журнала

2019-01-28 13:37:32.208  WARN 2688 --- [pool-1-thread-14                        ] org.springframework.integration.ftp.session.FtpSession                                               : failed to disconnect FTPClient

org.apache.commons.net.ftp.FTPConnectionClosedException: FTP response 421 received.  Server closed connection.
        at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:388)
        at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:300)
        at org.apache.commons.net.ftp.FTP.getReply(FTP.java:732)
        at org.apache.commons.net.ftp.FTPClient.completePendingCommand(FTPClient.java:1853)
        at org.springframework.integration.ftp.session.FtpSession.finalizeRaw(FtpSession.java:108)
        at org.springframework.integration.ftp.session.FtpSession.close(FtpSession.java:150)
        at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.close(CachingSessionFactory.java:208)
        at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:456)
        at org.springframework.integration.file.remote.RemoteFileTemplate.get(RemoteFileTemplate.java:393)
        at rabbitmq.listener.DemoConfiguration.lambda$demoFlow$1(DemoConfiguration.java:58)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:102)
        at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
        at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:57)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:237)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:204)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372)
        at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995)
        at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955)
        at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
        at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

2019-01-28 13:37:32.208  WARN 2688 --- [pool-1-thread-14                        ] org.springframework.integration.ftp.session.FtpSession                                               : failed to disconnect FTPClient

org.apache.commons.net.ftp.FTPConnectionClosedException: FTP response 421 received.  Server closed connection.
        at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:388)
        at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:300)
        at org.apache.commons.net.ftp.FTP.getReply(FTP.java:732)
        at org.apache.commons.net.ftp.FTPClient.completePendingCommand(FTPClient.java:1853)
        at org.springframework.integration.ftp.session.FtpSession.finalizeRaw(FtpSession.java:108)
        at org.springframework.integration.ftp.session.FtpSession.close(FtpSession.java:150)
        at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.close(CachingSessionFactory.java:208)
        at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:456)
        at org.springframework.integration.file.remote.RemoteFileTemplate.get(RemoteFileTemplate.java:393)
        at rabbitmq.listener.DemoConfiguration.lambda$demoFlow$1(DemoConfiguration.java:58)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:102)
        at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
        at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:57)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:237)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:204)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372)
        at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995)
        at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955)
        at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
        at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

2019-01-28 13:37:32.212  WARN 2688 --- [pool-1-thread-14                        ] org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler                            : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1613)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1517)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372)
        at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995)
        at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955)
        at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
        at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
        at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:111)
        at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
        at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:57)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:237)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:204)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514)
        ... 11 common frames omitted
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
        at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:451)
        at org.springframework.integration.file.remote.RemoteFileTemplate.get(RemoteFileTemplate.java:393)
        at rabbitmq.listener.DemoConfiguration.lambda$demoFlow$1(DemoConfiguration.java:58)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:102)
        ... 29 common frames omitted
Caused by: java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
        at org.springframework.integration.ftp.session.FtpSession.readRaw(FtpSession.java:98)
        at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.readRaw(CachingSessionFactory.java:280)
        at org.springframework.integration.file.remote.RemoteFileTemplate.lambda$get$4(RemoteFileTemplate.java:396)
        at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:442)
        ... 36 common frames omitted

2019-01-28 13:37:32.215 ERROR 2688 --- [pool-1-thread-14                        ] org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer                              : Failed to invoke listener

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1613)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1517)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372)
        at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995)
        at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955)
        at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
        at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
        at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:111)
        at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
        at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:57)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:237)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:204)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514)
        ... 11 common frames omitted
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
        at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:451)
        at org.springframework.integration.file.remote.RemoteFileTemplate.get(RemoteFileTemplate.java:393)
        at rabbitmq.listener.DemoConfiguration.lambda$demoFlow$1(DemoConfiguration.java:58)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:102)
        ... 29 common frames omitted
Caused by: java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
        at org.springframework.integration.ftp.session.FtpSession.readRaw(FtpSession.java:98)
        at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.readRaw(CachingSessionFactory.java:280)
        at org.springframework.integration.file.remote.RemoteFileTemplate.lambda$get$4(RemoteFileTemplate.java:396)
        at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:442)
        ... 36 common frames omitted

- РЕДАКТИРОВАТЬ

Я заменил CachingSessionFactory на DefaultFtpSessionFactory, чтобы избежать проблем с пулами и получить такое же поведение.10 сообщений находились в состоянии unacked на RabbitMQ в течение 10 минут.Через 10 минут я увидел, что файлы были обработаны, и на RabbitMQ больше не было сообщений.Я мог видеть, что на ftp-сервере один сеанс находился в состоянии IDLE в течение 10 минут.

    @Bean
    public SessionFactory<FTPFile> sessionFactory() {
        final DefaultFtpSessionFactory defaultFtpSessionFactory = new DefaultFtpSessionFactory();
        defaultFtpSessionFactory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
        defaultFtpSessionFactory.setHost("ftp.test.com");
        defaultFtpSessionFactory.setPort(1021);
        defaultFtpSessionFactory.setUsername("test");
        defaultFtpSessionFactory.setPassword("test");
        return defaultFtpSessionFactory;
    }

Мне это не кажется проблемой пула или FTP 421, скорее всего это что-то в DefaultFtpSessionFactory илисам сеанс?Почему эти темы так долго висят?

1 Ответ

0 голосов
/ 28 января 2019

У CachingSessionFactory нет ограничения по умолчанию:

/**
 * Create a CachingSessionFactory with the specified session limit. By default, if
 * no sessions are available in the cache, and the size limit has been reached,
 * calling threads will block until a session is available.
 * <p>
 * Do not cache a {@link DelegatingSessionFactory}, cache each delegate therein instead.
 * @see #setSessionWaitTimeout(long)
 * @see #setPoolSize(int)
 *
 * @param sessionFactory The underlying session factory.
 * @param sessionCacheSize The maximum cache size.
 */
public CachingSessionFactory(SessionFactory<F> sessionFactory, int sessionCacheSize) {

, и мы получим эту конфигурацию в базовом SimplePool:

/**
 * Creates a SimplePool with a specific limit.
 * @param poolSize The maximum number of items the pool supports.
 * @param callback A {@link PoolItemCallback} implementation called during various
 * pool operations.
 */
public SimplePool(int poolSize, PoolItemCallback<T> callback) {
    if (poolSize <= 0) {
        this.poolSize.set(Integer.MAX_VALUE);
        this.targetPoolSize.set(Integer.MAX_VALUE);
        this.permits.release(Integer.MAX_VALUE);
    }
    else {
        this.poolSize.set(poolSize);
        this.targetPoolSize.set(poolSize);
        this.permits.release(poolSize);
    }
    this.callback = callback;
}

Итак,подумайте, чтобы иметь разумный размер кэша для вашего CachingSessionFactory.

...