Обработка исключений IntegrationFlow - PullRequest
0 голосов
/ 17 мая 2018

Я пытаюсь создать поток для сообщения следующим образом:

TCPinboundAdapter ----> Брокер сообщений (ActiveMQ)

Поток:

Этот поток создается следующим образом

  1. Сообщение получено через TCP-соединение с адаптером TCP, который может быть клиентом или сервером.
  2. СообщениеПолученный на TCP-адаптер отправляется на JMS-адаптер (ActiveMQ Broker).

Код выглядит следующим образом:

@EventListener
public void handleTcpConnectionClientEvent(TcpConnectionFailedEvent event){

     TcpNioClientConnectionFactory tcp = (TcpNioClientConnectionFactory)event.getSource();
     System.out.println(tcp); 
     System.out.println("connection exception client :::"+event.getSource());

     this.status = event.toString();

 }
 @EventListener
 public void handleTcpConnectionServerExceptionEvent(TcpConnectionServerExceptionEvent event){
     System.out.println("connection exception server :::");

     this.status = event.toString();

 }

 // this method is invoked when the connection with the sever got disconnected
 @EventListener
 public void handleTcpConnectionServerEvent(TcpConnectionExceptionEvent event){
     System.out.println("connection exception serversssss :::"+event.getConnectionFactoryName());
     this.status = event.toString();

 }

 //when the connection got established (not for first time)
 @EventListener
 public void handleTcpConnectionCloseEvent(TcpConnectionOpenEvent event){
     System.out.println("connection opened :::"+event.getConnectionFactoryName());
    // status = event.toString();

 }

// create a server connection and flow to JMS  
private void createServerConnection(HostConnection hostConnection)  throws Throwable{
    this.status = "success";

    // IntegrationFlow flow;


IntegrationFlowRegistration theFlow;
     IntegrationFlow flow = 
IntegrationFlows.from(Tcp.inboundAdapter(Tcp.netServer(1234)
             .serializer(customSerializer)
             .deserializer(customSerializer)
             .id(hostConnection.getConnectionNumber()).soTimeout(10000)))
             .enrichHeaders(f->f.header("abc","abc")))
             .channel(directChannel())
             .handle(Jms.outboundAdapter(ConnectionFactory())
             .destination("jmsInbound"))
             .get();

           theFlow = this.flowContext.registration(flow).id("test.flow").register();


           if(this.status.equals("success"))
           createInboundFlow(hostConnection);

          // startConnection(hostConnection.getConnectionNumber());

}

Проблема:

Этот поток успешно создан и зарегистрирован в контексте приложения, когда нет исключений.Но в случае, когда есть исключение, т. Е. (BindException)

  1. Когда создается сервер для определенного порта, и порт уже используется, тогда это вызывает BindException, тогда также регистрируется поток. Итак, мыхотите, чтобы поток не регистрировался, когда есть исключение в любом из компонентов потока ниже.

    IntegrationFlowRegistration theFlow;
          IntegrationFlow flow = 
               IntegrationFlows.from(Tcp.inboundAdapter(Tcp.netServer(1234)
              .serializer(customSerializer)
              .deserializer(customSerializer)
              .id("server").soTimeout(10000)))
              .enrichHeaders(f->f.header("abc","abc")))
              .channel(directChannel())
              .handle(Jms.outboundAdapter(ConnectionFactory())
              .destination("jmsInbound"))
              .get();
    
          theFlow =this.flowContext.registration(flow).id("test.flow").register();
    

Существуют различные реализации Listener для проверки исключения в TCP-соединении try {} catchБлок () не вызывает никаких исключений.

Пожалуйста, предоставьте подходящий подход для обработки Исключений для адаптеров. В настоящее время я использую Слушатели для различных событий, чтобы знать, что с адаптерами tcp что-то не так.

После применения этого подхода, предоставленного г-ном Артемом Биланом

@EventListener public void handleTcpConnectionServerExceptionEvent(TcpConnectionServerExceptionEvent event){ System.out.println("connection exception server :::"+event); this.status = event.getCause().getMessage(); AbstractConnectionFactory server = (AbstractConnectionFactory)event.getSource(); System.out.println(server.getComponentName()); this.flowContext.remove(server.getComponentName()+"out.flow"); }

Я могу удалить поток с помощью FlowId, но не могу поймать исключение.консоль и не может быть обработано Даже я изменил метод на

private void createServerConnection(HostConnection hostConnection) throws Throwable{}

и обработал этиИсключение с помощью try {} catch (Throwable t) {} в вызывающей функции

Exception in thread "pool-4-thread-1" java.lang.NullPointerException

Исключение описано в более подробной форме в журналах, представленных ниже:

    2018-05-17 21:01:40.850  INFO 18332 --- [nio-8080-exec-4] 
    .s.i.i.t.c.TcpNetServerConnectionFactory : started Co123, port=1234
2018-05-17 21:01:40.850  INFO 18332 --- [nio-8080-exec-4] o.s.i.ip.tcp.TcpReceivingChannelAdapter  : started org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter#3
2018-05-17 21:01:40.851 ERROR 18332 --- [pool-5-thread-1] .s.i.i.t.c.TcpNetServerConnectionFactory : Error on ServerSocket; port = 1234

java.net.BindException: Address already in use: JVM_Bind
    at java.net.DualStackPlainSocketImpl.bind0(Native Method) ~[na:1.8.0_111]
    at java.net.DualStackPlainSocketImpl.socketBind(Unknown Source) ~[na:1.8.0_111]
    at java.net.AbstractPlainSocketImpl.bind(Unknown Source) ~[na:1.8.0_111]
    at java.net.PlainSocketImpl.bind(Unknown Source) ~[na:1.8.0_111]
    at java.net.ServerSocket.bind(Unknown Source) ~[na:1.8.0_111]
    at java.net.ServerSocket.<init>(Unknown Source) ~[na:1.8.0_111]
    at java.net.ServerSocket.<init>(Unknown Source) ~[na:1.8.0_111]
    at javax.net.DefaultServerSocketFactory.createServerSocket(Unknown Source) ~[na:1.8.0_111]
    at org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory.createServerSocket(TcpNetServerConnectionFactory.java:211) ~[spring-integration-ip-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory.run(TcpNetServerConnectionFactory.java:106) ~[spring-integration-ip-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [na:1.8.0_111]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.8.0_111]
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_111]

connection exception server :::TcpConnectionServerExceptionEvent [source=Co123, port=1234, cause=java.net.BindException: Address already in use: JVM_Bind]
Co123
2018-05-17 21:01:40.851  INFO 18332 --- [pool-5-thread-1] o.s.i.ip.tcp.TcpReceivingChannelAdapter  : stopped org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter#3
2018-05-17 21:01:40.851  INFO 18332 --- [pool-5-thread-1] o.s.i.endpoint.EventDrivenConsumer       : Removing {transformer} as a subscriber to the 'Co123out.flow.channel#0' channel
2018-05-17 21:01:40.852  INFO 18332 --- [pool-5-thread-1] o.s.integration.channel.DirectChannel    : Channel 'application.Co123out.flow.channel#0' has 0 subscriber(s).
2018-05-17 21:01:40.852  INFO 18332 --- [pool-5-thread-1] o.s.i.endpoint.EventDrivenConsumer       : stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#11
2018-05-17 21:01:40.852  INFO 18332 --- [pool-5-thread-1] o.s.i.endpoint.EventDrivenConsumer       : Removing {jms:outbound-channel-adapter} as a subscriber to the 'Co123out.flow.channel#1' channel
2018-05-17 21:01:40.852  INFO 18332 --- [pool-5-thread-1] o.s.integration.channel.DirectChannel    : Channel 'application.Co123out.flow.channel#1' has 0 subscriber(s).
2018-05-17 21:01:40.852  INFO 18332 --- [pool-5-thread-1] o.s.i.endpoint.EventDrivenConsumer       : stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#12
Exception in thread "pool-4-thread-1" java.lang.NullPointerException
    at org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory.run(TcpNetServerConnectionFactory.java:185)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Exception in thread "pool-5-thread-1" java.lang.NullPointerException
    at org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory.run(TcpNetServerConnectionFactory.java:185)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)`

1 Ответ

0 голосов
/ 17 мая 2018

Вы регистрируете IntegrationFlow через:

this.flowContext.registration(flow).id("test.flow").register();

Этот же компонент his.flowContext и этот идентификатор для потока могут использоваться для уничтожения потока из любого другого места, например, прослушивателя событий, когдавы ловите упомянутые BindException:

    /**
 * Destroy an {@link IntegrationFlow} bean (as well as all its dependant beans)
 * for provided {@code flowId} and clean up all the local cache for it.
 * @param flowId the bean name to destroy from
 */
void remove(String flowId);
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...