Остановка адаптера TCP с помощью Client ConnectionFactory - PullRequest
0 голосов
/ 04 июня 2018

Я пытаюсь остановить адаптер TCP с фабрикой клиентских подключений с помощью команды шины управления, т. Е. @AdapterID.stop() Код для создания потока приведен ниже:

 IntegrationFlow flow = IntegrationFlows.from(Tcp.inboundAdapter(Tcp.nioClient("127.0.0.1",3226))
             .serializer(customSerializer)
             .deserializer(customSerializer)
             .id("abc")).clientMode(true).retryInterval(1000).errorChannel("testChannel"))
             .enrichHeaders(f->f.header("abc","abc"))
             .channel(directChannel())
             .handle(Jms.outboundAdapter(ConnectionFactory())
             .destination(hostConnection.getConnectionNumber()))
             .get();

     theFlow = this.flowContext.registration(flow).id("out.flow").register(); 

Когда я останавливаю адаптер с помощью шины управлениякак:

 public String stopConnectionAdapter(String connectionName) {
    MessageChannel controlChannel = ac.getBean("controlBus.input", MessageChannel.class); 
    String exp = "@"+connectionName+".stop()";
    controlChannel.send(new GenericMessage<String>(exp));
    return "STOPPED";
}

Адаптер остановлен, но на консоли всегда печатается следующее исключение

2018-06-04 11:09:45.551 ERROR 34860 --- [ask-scheduler-7] o.s.i.i.t.c.ClientModeConnectionManager  : Could not establish connection using conncr3, host=127.0.0.1, port=3226

java.io.IOException: conncr3, host=127.0.0.1, port=3226 connection factory has not been started
at org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory.checkActive(AbstractConnectionFactory.java:873) ~[spring-integration-ip-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory.checkActive(TcpNioClientConnectionFactory.java:68) ~[spring-integration-ip-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory.getConnection(AbstractClientConnectionFactory.java:68) ~[spring-integration-ip-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory.getConnection(AbstractClientConnectionFactory.java:33) ~[spring-integration-ip-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.integration.ip.tcp.connection.ClientModeConnectionManager.run(ClientModeConnectionManager.java:55) ~[spring-integration-ip-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_111]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_111]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_111]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_111]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_111]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_111]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]

Как я могу остановить эту ошибку и закрыть соединение без каких-либо исключений и ошибок.

Кроме того, каков эффективный способ обработки (запуска и остановки во время выполнения) этого соединения (компонентов потока интеграции) последовательным образом без каких-либо ошибок?

Ответы [ 2 ]

0 голосов
/ 04 июня 2018

Вы не должны останавливать фабрику;его жизненный цикл контролируется адаптером;просто остановите адаптер;нет необходимости останавливать другие компоненты.

Вы также можете stop() весь компонент IntegrationFlow (out.flow), и он остановит компоненты.

0 голосов
/ 04 июня 2018

.clientMode(true) является виновником здесь.Я пытаюсь остановить ConnectionFactory с идентификатором "abc" в определении потока .id("abc")).clientMode(true).retryInterval(1000).errorChannel("testChannel"))

Теперь я добавил идентификатор для InboundAdapter, как показано ниже: .id("abc")).clientMode(true).retryInterval(1000).errorChannel("testChannel").id("test"))

И с помощью элемента управлениясначала закройте шину connectionFactory, а затем адаптер как @abc.stop() @test.stop(). Вышеуказанная ошибка будет устранена.

Но я до сих пор не знаю подходящего способа эффективной остановки IntegrationFlow, т. е. я должен предоставитьуникальный идентификатор для каждого компонента и остановка один за другим с помощью команды шины управления.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...