Я пытаюсь создать поток интеграции:
Connection A : [ ActiveMq (queue1) ---> TCP Server (1111) ](spring boot Application) ---> [ExternalApplication (client connected to Server running on port (1111))] (Application on different Technology (VB))
Connection B : [ ActiveMq (queue2) ---> TCP Server (2222) ](spring boot Application) ---> [ExternalApplication (client connected to Server running on port (2222))] (Application on different Technology (VB))
Итак, вышеупомянутый поток изображает следующий сценарий:
У меня есть приложение весенней загрузки (скажем, springUtility), которое подключается к внешнему приложению (скажем, externalutility) по какой-то другой технологии.
Соединение между springutility и externalUtility осуществляется через TCPпротокол.
Будет иметься более одного соединения между обоими утилитами, как на схеме подключения выше A и соединением B.
Потоксообщения будет от springUtilty к externalutility (в одну сторону)
Теперь externalutility может работать в режиме клиента или в режиме сервера.Итак, если externalUtility запускается в режиме сервера, то моя подпружиненная утилита будет работать в режиме клиента (готово), а также для создания более одного соединения. Я запускаю приведенный ниже код для кода цикла:
Код:
flow = IntegrationFlows
.from(Jms.inboundAdapter(ConnectionFactory())
.destination("rec"))
.channel(directChannel()).handle(Tcp.outboundAdapter(Tcp.nioClient("172.18.11.22",5555)
.serializer(customSerializer)
.deserializer(customSerializer)
.id(hostConnection.getConnectionNumber()).soTimeout(10000)))
.get();
flowRegistration = this.flowContext.registration(flow).id("in.flow").register();
Проблема: при запуске externalutility в режиме клиента утилита Spring должна создать сервер, к которому будет подключаться внешняя утилита, и затем начать получать сообщения от утилиты Spring: что я пытался реализовать неправильноway.
Итак, наконец, я должен реализовать вышеупомянутое соединение в качестве контрольного примера для проверки потока, означает, что утилита spring открыла порт сервера (1111 и 2222), к которому подключается внешнее приложениеэти два порта и начинают получать сообщения от утилиты Spring соответственно из queue1 и queue2.Оба соединения будут иметь отдельный поток (как только будет создан один поток, мы можем использовать цикл for, чтобы создать нужное соединение).
Код:
flow = IntegrationFlows
.from(Jms.inboundAdapter(ConnectionFactory())
.destination("rec"))
.channel(directChannel()).handle(Tcp.outboundAdapter(Tcp.netServer(5555)
.serializer(customSerializer)
.deserializer(customSerializer)
.id(hostConnection.getConnectionNumber()).soTimeout(10000)))
.get();
flowRegistration = this.flowContext.registration(flow).id("in.flow").register();
`код:
2018-05-30 17:39:39.171 INFO 15776 --- [nio-8080-exec-1] o.s.i.endpoint.EventDrivenConsumer : Adding {jms:outbound-channel-adapter} as a subscriber to the 'Conn1311out.flow.channel#0' channel
2018-05-30 17:39:39.172 INFO 15776 --- [nio-8080-exec-1] o.s.integration.channel.DirectChannel : Channel 'application.Conn1311out.flow.channel#0' has 1 subscriber(s).
2018-05-30 17:39:39.172 INFO 15776 --- [nio-8080-exec-1] o.s.i.endpoint.EventDrivenConsumer : started org.springframework.integration.config.ConsumerEndpointFactoryBean#3
2018-05-30 17:39:41.965 INFO 15776 --- [nio-8080-exec-1] o.s.i.endpoint.EventDrivenConsumer : Adding {ip:tcp-outbound-channel-adapter} as a subscriber to the 'Conn1311in.flow.channel#0' channel
2018-05-30 17:39:41.965 INFO 15776 --- [nio-8080-exec-1] o.s.integration.channel.DirectChannel : Channel 'application.Conn1311in.flow.channel#0' has 1 subscriber(s).
2018-05-30 17:39:41.966 INFO 15776 --- [nio-8080-exec-1] .s.i.i.t.c.TcpNetServerConnectionFactory : started Conn1311, port=3333
2018-05-30 17:39:41.966 INFO 15776 --- [nio-8080-exec-1] o.s.i.endpoint.EventDrivenConsumer : started org.springframework.integration.config.ConsumerEndpointFactoryBean#4
2018-05-30 17:39:41.966 INFO 15776 --- [pool-1-thread-1] .s.i.i.t.c.TcpNetServerConnectionFactory : Conn1311, port=3333 No listener bound to server connection factory; will not read; exiting...
2018-05-30 17:39:41.971 INFO 15776 --- [nio-8080-exec-1] o.s.i.e.SourcePollingChannelAdapter : started org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0
Сначала создайте сервер на предоставленном порту, но выйдите со следующим сообщением
No listener bound to server connection factory; will not read; exiting...
Когда я создаюклиент работает нормально, но при создании сервера выдает ошибку
РЕДАКТИРОВАТЬ:
Я использовал следующий подход для создания динамически другого сервера и отправки сообщения подключенному клиенту от его соответствующегоклиент activemq
Я создал входящий адаптер, как показано ниже:
IntegrationFlow flow;
CustomSerializer customSerializer = getCustomSerializer(String.valueOf(hostConnection.getMaxMessageLength()),
hostConnection.getTerminatorChar());
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
TcpNetServerConnectionFactory cf = new TcpNetServerConnectionFactory(1234));
cf.setSerializer(customSerializer);
cf.setDeserializer(customSerializer);
handler.setConnectionFactory(cf);
flow = IntegrationFlows
.from(Tcp.inboundAdapter(cf).id("adapter")).handle(handler)
.get();
this.flowContext.registration(flow).id("inflow")
.addBean(hostConnection.getConnectionNumber(),cf)
.addBean(hostConnection.getConnectionNumber()+"handler",handler)
.register();
Я создал поток Jms, в котором сообщение из очередиотправляется на маршрутизатор, как показано ниже:
flow = IntegrationFlows
.from(Jms.inboundAdapter(activeMQConnectionFactory)
.destination(hostConnection.getConnectionNumber() +"rec"))
.channel(directChannel()).route(new ServerRouter())
.get();
когда клиент подключился к моему серверу, я создаю поток из прослушивателя handleTcpConnectionCloseEvent(TcpConnectionOpenEvent event)
и создаю поток как:
void createServerFlow(TcpConnectionOpenEvent event) {
String connectionNumber = event.getConnectionFactoryName();
TcpNetConnection server = (TcpNetConnection) event.getSource();
TcpSendingMessageHandler handler = (TcpSendingMessageHandler) ac.getBean(connectionNumber + "handler");
IntegrationFlow flow = f -> f.enrichHeaders(e -> e.header(IpHeaders.CONNECTION_ID, event.getConnectionId()))
.handle(handler);
IntegrationFlowRegistration flowregister = this.flowContext.registration(flow).id("outclient").register();
MessageChannel channel = flowregister.getInputChannel();
this.subFlows.put(connectionNumber + "server", channel);
}
Мой сервер Поиск роутераканал и отправить на целевой канал
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
String serverChannel = (String) message.getHeaders().get("connectionnumber");
MessageChannel channel = HostConnectionRepository.subFlows
.get(serverChannel+"server");
return Collections.singletonList(channel);
}
когда меня отключат, я удаляю поток из flowContext, созданного в step 3
.
Проблема: Когда я удаляю поток, бин, связанный с потоком, также удаляется, как написано в документации, что связанные бины также будут отбрасываться.
Это удаляет мой bean-компонент обработчика, который закрывает адаптер TCP и фактическое соединение, следовательно, как только он отключается, он не может подключиться снова
Как я могу отбросить поток outclientflow без удаления bean-компонента обработчика?