Настройте динамический TCP-сервер для отправки сообщения подключенному к клиенту Spring Integration с использованием Java DSL - PullRequest
0 голосов
/ 30 мая 2018

Я пытаюсь создать поток интеграции:

Connection A : [ ActiveMq (queue1) ---> TCP Server (1111) ](spring boot Application) ---> [ExternalApplication (client connected to Server running on port (1111))] (Application on different Technology (VB))

Connection B : [ ActiveMq (queue2) ---> TCP Server (2222) ](spring boot Application) ---> [ExternalApplication (client connected to Server running on port (2222))] (Application on different Technology (VB))

Итак, вышеупомянутый поток изображает следующий сценарий:

  1. У меня есть приложение весенней загрузки (скажем, springUtility), которое подключается к внешнему приложению (скажем, externalutility) по какой-то другой технологии.

  2. Соединение между springutility и externalUtility осуществляется через TCPпротокол.

  3. Будет иметься более одного соединения между обоими утилитами, как на схеме подключения выше A и соединением B.

  4. Потоксообщения будет от springUtilty к externalutility (в одну сторону)

  5. Теперь externalutility может работать в режиме клиента или в режиме сервера.Итак, если externalUtility запускается в режиме сервера, то моя подпружиненная утилита будет работать в режиме клиента (готово), а также для создания более одного соединения. Я запускаю приведенный ниже код для кода цикла:

Код:

flow = IntegrationFlows
            .from(Jms.inboundAdapter(ConnectionFactory())
                    .destination("rec"))
                    .channel(directChannel()).handle(Tcp.outboundAdapter(Tcp.nioClient("172.18.11.22",5555)
                             .serializer(customSerializer)
                             .deserializer(customSerializer)
                             .id(hostConnection.getConnectionNumber()).soTimeout(10000)))
                            .get();

    flowRegistration = this.flowContext.registration(flow).id("in.flow").register();

Проблема: при запуске externalutility в режиме клиента утилита Spring должна создать сервер, к которому будет подключаться внешняя утилита, и затем начать получать сообщения от утилиты Spring: что я пытался реализовать неправильноway.

Итак, наконец, я должен реализовать вышеупомянутое соединение в качестве контрольного примера для проверки потока, означает, что утилита spring открыла порт сервера (1111 и 2222), к которому подключается внешнее приложениеэти два порта и начинают получать сообщения от утилиты Spring соответственно из queue1 и queue2.Оба соединения будут иметь отдельный поток (как только будет создан один поток, мы можем использовать цикл for, чтобы создать нужное соединение).

Код:

flow = IntegrationFlows
            .from(Jms.inboundAdapter(ConnectionFactory())
                    .destination("rec"))
                    .channel(directChannel()).handle(Tcp.outboundAdapter(Tcp.netServer(5555)
                             .serializer(customSerializer)
                             .deserializer(customSerializer)
                             .id(hostConnection.getConnectionNumber()).soTimeout(10000)))
                            .get();

    flowRegistration = this.flowContext.registration(flow).id("in.flow").register();

`код:

2018-05-30 17:39:39.171  INFO 15776 --- [nio-8080-exec-1] o.s.i.endpoint.EventDrivenConsumer       : Adding {jms:outbound-channel-adapter} as a subscriber to the 'Conn1311out.flow.channel#0' channel
2018-05-30 17:39:39.172  INFO 15776 --- [nio-8080-exec-1] o.s.integration.channel.DirectChannel    : Channel 'application.Conn1311out.flow.channel#0' has 1 subscriber(s).
2018-05-30 17:39:39.172  INFO 15776 --- [nio-8080-exec-1] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.config.ConsumerEndpointFactoryBean#3
2018-05-30 17:39:41.965  INFO 15776 --- [nio-8080-exec-1] o.s.i.endpoint.EventDrivenConsumer       : Adding {ip:tcp-outbound-channel-adapter} as a subscriber to the 'Conn1311in.flow.channel#0' channel
2018-05-30 17:39:41.965  INFO 15776 --- [nio-8080-exec-1] o.s.integration.channel.DirectChannel    : Channel 'application.Conn1311in.flow.channel#0' has 1 subscriber(s).
2018-05-30 17:39:41.966  INFO 15776 --- [nio-8080-exec-1] .s.i.i.t.c.TcpNetServerConnectionFactory : started Conn1311, port=3333
2018-05-30 17:39:41.966  INFO 15776 --- [nio-8080-exec-1] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.config.ConsumerEndpointFactoryBean#4
2018-05-30 17:39:41.966  INFO 15776 --- [pool-1-thread-1] .s.i.i.t.c.TcpNetServerConnectionFactory : Conn1311, port=3333 No listener bound to server connection factory; will not read; exiting...
2018-05-30 17:39:41.971  INFO 15776 --- [nio-8080-exec-1] o.s.i.e.SourcePollingChannelAdapter      : started org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0

Сначала создайте сервер на предоставленном порту, но выйдите со следующим сообщением

No listener bound to server connection factory; will not read; exiting...

Когда я создаюклиент работает нормально, но при создании сервера выдает ошибку

РЕДАКТИРОВАТЬ:

Я использовал следующий подход для создания динамически другого сервера и отправки сообщения подключенному клиенту от его соответствующегоклиент activemq

  1. Я создал входящий адаптер, как показано ниже:

    IntegrationFlow flow;
    CustomSerializer customSerializer = getCustomSerializer(String.valueOf(hostConnection.getMaxMessageLength()),
            hostConnection.getTerminatorChar());
    
    TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
    TcpNetServerConnectionFactory cf = new TcpNetServerConnectionFactory(1234));
    cf.setSerializer(customSerializer);
    cf.setDeserializer(customSerializer);
    handler.setConnectionFactory(cf);
    
         flow = IntegrationFlows
            .from(Tcp.inboundAdapter(cf).id("adapter")).handle(handler)
            .get();
    
         this.flowContext.registration(flow).id("inflow")
         .addBean(hostConnection.getConnectionNumber(),cf)
         .addBean(hostConnection.getConnectionNumber()+"handler",handler)
         .register();
    
  2. Я создал поток Jms, в котором сообщение из очередиотправляется на маршрутизатор, как показано ниже:

        flow = IntegrationFlows
            .from(Jms.inboundAdapter(activeMQConnectionFactory)
            .destination(hostConnection.getConnectionNumber() +"rec")) 
            .channel(directChannel()).route(new ServerRouter())
            .get(); 
    
  3. когда клиент подключился к моему серверу, я создаю поток из прослушивателя handleTcpConnectionCloseEvent(TcpConnectionOpenEvent event) и создаю поток как:

    void createServerFlow(TcpConnectionOpenEvent event) {
        String connectionNumber = event.getConnectionFactoryName();
        TcpNetConnection server = (TcpNetConnection) event.getSource();
        TcpSendingMessageHandler handler = (TcpSendingMessageHandler) ac.getBean(connectionNumber + "handler");
    
        IntegrationFlow flow = f -> f.enrichHeaders(e -> e.header(IpHeaders.CONNECTION_ID, event.getConnectionId()))
                .handle(handler);
        IntegrationFlowRegistration flowregister = this.flowContext.registration(flow).id("outclient").register();
    
        MessageChannel channel = flowregister.getInputChannel();
    
        this.subFlows.put(connectionNumber + "server", channel);
    }
    
  4. Мой сервер Поиск роутераканал и отправить на целевой канал

    protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
    String serverChannel = (String) message.getHeaders().get("connectionnumber");
    MessageChannel channel = HostConnectionRepository.subFlows
            .get(serverChannel+"server");
    
    return Collections.singletonList(channel);
    

    }

  5. когда меня отключат, я удаляю поток из flowContext, созданного в step 3.

Проблема: Когда я удаляю поток, бин, связанный с потоком, также удаляется, как написано в документации, что связанные бины также будут отбрасываться.

Это удаляет мой bean-компонент обработчика, который закрывает адаптер TCP и фактическое соединение, следовательно, как только он отключается, он не может подключиться снова

Как я могу отбросить поток outclientflow без удаления bean-компонента обработчика?

Ответы [ 2 ]

0 голосов
/ 30 мая 2018

Непонятно, что вы пытаетесь сделать;клиенты обычно инициируют соединения, а не предлагают порт для подключения других.

Даже если кто-то подключается к этому порту, клиент не будет знать, в какой сокет отправить сообщение.Таким образом, вы получите ошибки во время выполнения.

Нет прослушивателя, привязанного к фабрике соединений с сервером;не буду читать;выход ...

В этом сообщении просто говорится, что на заводе не зарегистрирован ни один компонент для приема входящих сообщений.

0 голосов
/ 30 мая 2018

Tcp.outboundAdapter() не является TcpListener.Это просто отправитель на сокет TCP.Вот почему вы получаете это исключение.

Если мы говорим о TCP-сервере, он должен быть слушателем, входящей конечной точкой.

Совершенно непонятно, зачем динамически создавать TCP-сервер при операции отправки ...

Вам необходимо переосмыслить свою логику и иметь такой TCP-сервер для прослушивания действительно внеэто динамическая логика.И здесь будет использоваться только вариант клиента для отправки пакетов TCP на требуемый порт TCP.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...