Я пытаюсь создать поток для сообщения следующим образом:
TCPinboundAdapter ----> Брокер сообщений (ActiveMQ)
Поток:
Этот поток создается следующим образом
- Сообщение получено через TCP-соединение с адаптером TCP, который может быть клиентом или сервером.
- СообщениеПолученный на TCP-адаптер отправляется на JMS-адаптер (ActiveMQ Broker).
Код выглядит следующим образом:
@EventListener
public void handleTcpConnectionClientEvent(TcpConnectionFailedEvent event){
TcpNioClientConnectionFactory tcp = (TcpNioClientConnectionFactory)event.getSource();
System.out.println(tcp);
System.out.println("connection exception client :::"+event.getSource());
this.status = event.toString();
}
@EventListener
public void handleTcpConnectionServerExceptionEvent(TcpConnectionServerExceptionEvent event){
System.out.println("connection exception server :::");
this.status = event.toString();
}
// this method is invoked when the connection with the sever got disconnected
@EventListener
public void handleTcpConnectionServerEvent(TcpConnectionExceptionEvent event){
System.out.println("connection exception serversssss :::"+event.getConnectionFactoryName());
this.status = event.toString();
}
//when the connection got established (not for first time)
@EventListener
public void handleTcpConnectionCloseEvent(TcpConnectionOpenEvent event){
System.out.println("connection opened :::"+event.getConnectionFactoryName());
// status = event.toString();
}
// create a server connection and flow to JMS
private void createServerConnection(HostConnection hostConnection) throws Throwable{
this.status = "success";
// IntegrationFlow flow;
IntegrationFlowRegistration theFlow;
IntegrationFlow flow =
IntegrationFlows.from(Tcp.inboundAdapter(Tcp.netServer(1234)
.serializer(customSerializer)
.deserializer(customSerializer)
.id(hostConnection.getConnectionNumber()).soTimeout(10000)))
.enrichHeaders(f->f.header("abc","abc")))
.channel(directChannel())
.handle(Jms.outboundAdapter(ConnectionFactory())
.destination("jmsInbound"))
.get();
theFlow = this.flowContext.registration(flow).id("test.flow").register();
if(this.status.equals("success"))
createInboundFlow(hostConnection);
// startConnection(hostConnection.getConnectionNumber());
}
Проблема:
Этот поток успешно создан и зарегистрирован в контексте приложения, когда нет исключений.Но в случае, когда есть исключение, т. Е. (BindException)
Когда создается сервер для определенного порта, и порт уже используется, тогда это вызывает BindException, тогда также регистрируется поток. Итак, мыхотите, чтобы поток не регистрировался, когда есть исключение в любом из компонентов потока ниже.
IntegrationFlowRegistration theFlow;
IntegrationFlow flow =
IntegrationFlows.from(Tcp.inboundAdapter(Tcp.netServer(1234)
.serializer(customSerializer)
.deserializer(customSerializer)
.id("server").soTimeout(10000)))
.enrichHeaders(f->f.header("abc","abc")))
.channel(directChannel())
.handle(Jms.outboundAdapter(ConnectionFactory())
.destination("jmsInbound"))
.get();
theFlow =this.flowContext.registration(flow).id("test.flow").register();
Существуют различные реализации Listener для проверки исключения в TCP-соединении try {} catchБлок () не вызывает никаких исключений.
Пожалуйста, предоставьте подходящий подход для обработки Исключений для адаптеров. В настоящее время я использую Слушатели для различных событий, чтобы знать, что с адаптерами tcp что-то не так.
После применения этого подхода, предоставленного г-ном Артемом Биланом
@EventListener
public void handleTcpConnectionServerExceptionEvent(TcpConnectionServerExceptionEvent event){
System.out.println("connection exception server :::"+event);
this.status = event.getCause().getMessage();
AbstractConnectionFactory server = (AbstractConnectionFactory)event.getSource();
System.out.println(server.getComponentName());
this.flowContext.remove(server.getComponentName()+"out.flow");
}
Я могу удалить поток с помощью FlowId, но не могу поймать исключение.консоль и не может быть обработано Даже я изменил метод на
private void createServerConnection(HostConnection hostConnection) throws Throwable{}
и обработал этиИсключение с помощью try {} catch (Throwable t) {} в вызывающей функции
Exception in thread "pool-4-thread-1" java.lang.NullPointerException
Исключение описано в более подробной форме в журналах, представленных ниже:
2018-05-17 21:01:40.850 INFO 18332 --- [nio-8080-exec-4]
.s.i.i.t.c.TcpNetServerConnectionFactory : started Co123, port=1234
2018-05-17 21:01:40.850 INFO 18332 --- [nio-8080-exec-4] o.s.i.ip.tcp.TcpReceivingChannelAdapter : started org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter#3
2018-05-17 21:01:40.851 ERROR 18332 --- [pool-5-thread-1] .s.i.i.t.c.TcpNetServerConnectionFactory : Error on ServerSocket; port = 1234
java.net.BindException: Address already in use: JVM_Bind
at java.net.DualStackPlainSocketImpl.bind0(Native Method) ~[na:1.8.0_111]
at java.net.DualStackPlainSocketImpl.socketBind(Unknown Source) ~[na:1.8.0_111]
at java.net.AbstractPlainSocketImpl.bind(Unknown Source) ~[na:1.8.0_111]
at java.net.PlainSocketImpl.bind(Unknown Source) ~[na:1.8.0_111]
at java.net.ServerSocket.bind(Unknown Source) ~[na:1.8.0_111]
at java.net.ServerSocket.<init>(Unknown Source) ~[na:1.8.0_111]
at java.net.ServerSocket.<init>(Unknown Source) ~[na:1.8.0_111]
at javax.net.DefaultServerSocketFactory.createServerSocket(Unknown Source) ~[na:1.8.0_111]
at org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory.createServerSocket(TcpNetServerConnectionFactory.java:211) ~[spring-integration-ip-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory.run(TcpNetServerConnectionFactory.java:106) ~[spring-integration-ip-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [na:1.8.0_111]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.8.0_111]
at java.lang.Thread.run(Unknown Source) [na:1.8.0_111]
connection exception server :::TcpConnectionServerExceptionEvent [source=Co123, port=1234, cause=java.net.BindException: Address already in use: JVM_Bind]
Co123
2018-05-17 21:01:40.851 INFO 18332 --- [pool-5-thread-1] o.s.i.ip.tcp.TcpReceivingChannelAdapter : stopped org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter#3
2018-05-17 21:01:40.851 INFO 18332 --- [pool-5-thread-1] o.s.i.endpoint.EventDrivenConsumer : Removing {transformer} as a subscriber to the 'Co123out.flow.channel#0' channel
2018-05-17 21:01:40.852 INFO 18332 --- [pool-5-thread-1] o.s.integration.channel.DirectChannel : Channel 'application.Co123out.flow.channel#0' has 0 subscriber(s).
2018-05-17 21:01:40.852 INFO 18332 --- [pool-5-thread-1] o.s.i.endpoint.EventDrivenConsumer : stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#11
2018-05-17 21:01:40.852 INFO 18332 --- [pool-5-thread-1] o.s.i.endpoint.EventDrivenConsumer : Removing {jms:outbound-channel-adapter} as a subscriber to the 'Co123out.flow.channel#1' channel
2018-05-17 21:01:40.852 INFO 18332 --- [pool-5-thread-1] o.s.integration.channel.DirectChannel : Channel 'application.Co123out.flow.channel#1' has 0 subscriber(s).
2018-05-17 21:01:40.852 INFO 18332 --- [pool-5-thread-1] o.s.i.endpoint.EventDrivenConsumer : stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#12
Exception in thread "pool-4-thread-1" java.lang.NullPointerException
at org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory.run(TcpNetServerConnectionFactory.java:185)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Exception in thread "pool-5-thread-1" java.lang.NullPointerException
at org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory.run(TcpNetServerConnectionFactory.java:185)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)`