Как отправлять асинхронные запросы / ответы через сотрудничающие канальные адаптеры без корреляции данных - PullRequest
0 голосов
/ 01 февраля 2019

Сводка

Я использую поддержку TCP & UDP от Spring Integration для передачи потокового TCP-трафика через мое приложение на вышестоящий сервер, а затем для передачи ответа этого сервера обратно через мое приложение наклиент.Хотя это двусторонняя связь, мне нужна большая асинхронная пропускная способность, поэтому я не могу использовать шлюзы.Вместо этого я пытаюсь использовать совместную работу адаптеров исходящего и входящего канала, как описано в разделе 34.8.2.

Настройка компонента интеграции

Запрос

ATcpReceivingChannelAdapter получает запросы через TcpNetServerConnectionFactory через порт 6060. Он помещает эти запросы в запросы QueueChannel.Запросы обрабатываются TcpSendingMessageHandler, который отправляет запрос по клиентскому соединению, созданному TcpNetClientConnectionFactory.Это соединение отправляет запрос из моего приложения и на вышестоящий сервер.

Response

A TcpReceivingChannelAdapter получает ответы от вышестоящего сервера через соединение TcpNetClientConnectionFactory.Относит эти ответы на ответы QueueChannel.Ответы принимаются TcpSendingMessageHandler, который пытается отправить ответ обратно клиенту через соединение с исходного TcpNetServerConnectionFactory.Это окончательное соединение является тем, что не удается.

    @Bean
    public PollableChannel requestChannel() {
        return new QueueChannel(1000);
    }

    @Bean
    public PollableChannel replyChannel() {
        return new QueueChannel(1000);
    }

    @Bean
    public TcpNetServerConnectionFactory serverFactory() {
        TcpNetServerConnectionFactory serverFactory = new TcpNetServerConnectionFactory(6060);
        serverFactory.setSerializer(new ByteArrayLengthHeaderSerializer(2));
        serverFactory.setDeserializer(new ByteArrayLengthHeaderSerializer(2));
        serverFactory.setSingleUse(false);
        return serverFactory;
    }

    @Bean
    public TcpNetClientConnectionFactory clientFactory() {
        TcpNetClientConnectionFactory clientFactory = new TcpNetClientConnectionFactory("127.0.0.1", 6080);
        clientFactory.setSerializer(new ByteArrayLengthHeaderSerializer(2));
        clientFactory.setDeserializer(new ByteArrayLengthHeaderSerializer(2));
        clientFactory.setSingleUse(false);
        return clientFactory;
    }

    @Bean
    public TcpReceivingChannelAdapter inboundRequestAdapter() {
        TcpReceivingChannelAdapter inboundRequestAdapter = new TcpReceivingChannelAdapter();
        inboundRequestAdapter.setConnectionFactory(serverFactory());
        inboundRequestAdapter.setOutputChannel(requestChannel());
        return inboundRequestAdapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "requestChannel", poller = @Poller(fixedDelay = "50", receiveTimeout = "5000"))
    public TcpSendingMessageHandler outboundRequestAdapter() {
        TcpSendingMessageHandler outboundRequestAdapter = new TcpSendingMessageHandler();
        outboundRequestAdapter.setConnectionFactory(clientFactory());
        return outboundRequestAdapter;
    }

    @Bean
    public TcpReceivingChannelAdapter inboundReplyAdapter() {
        TcpReceivingChannelAdapter inboundReplyAdapter = new TcpReceivingChannelAdapter();
        inboundReplyAdapter.setConnectionFactory(clientFactory());
        inboundReplyAdapter.setOutputChannel(replyChannel());
        return inboundReplyAdapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "replyChannel", poller = @Poller(fixedDelay = "50", receiveTimeout = "5000"))
    public TcpSendingMessageHandler outboundReplyAdapter() {
        TcpSendingMessageHandler outboundReplyAdapter = new TcpSendingMessageHandler();
        outboundReplyAdapter.setConnectionFactory(serverFactory());
        return outboundReplyAdapter;
    }

Фактический результат

Ошибка:

Unable to find outbound socket for GenericMessage

Полная трассировка стека:

2019-02-01 14:10:55.315 ERROR 32553 --- [ask-scheduler-2] o.s.i.ip.tcp.TcpSendingMessageHandler    : Unable to find outbound socket for GenericMessage [payload=byte[297], headers={ip_tcp_remotePort=6080, ip_connectionId=localhost:6080:51339:a3f66802-b194-4564-99c7-f194e55ddb11, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=bc36ec21-e2ae-405e-afa9-c0ec2f2eff8d, ip_hostname=localhost, timestamp=1549051855315}]
2019-02-01 14:10:55.319 ERROR 32553 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: Unable to find outbound socket, failedMessage=GenericMessage [payload=byte[297], headers={ip_tcp_remotePort=6080, ip_connectionId=localhost:6080:51339:a3f66802-b194-4564-99c7-f194e55ddb11, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=bc36ec21-e2ae-405e-afa9-c0ec2f2eff8d, ip_hostname=localhost, timestamp=1549051855315}]
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:123)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
    at org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:49)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
    at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:143)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Это делаетсмысл.Мне известно, что TcpReceivingChannelAdapter устанавливает поле заголовка сообщения ip_connectionId, когда оно пересылает сообщение.Поскольку у меня сейчас нет никакой логики корреляции, заголовок идентификатора от первого входящего адаптера теряется, когда полезная нагрузка передается по восходящему каналу, а второй входящий адаптер генерирует новый заголовок идентификатора.

В результатекогда ответ возвращается к последнему исходящему адаптеру, заголовок идентификатора не соответствует ничему, о чем знает соответствующий входящий адаптер.Таким образом, он не знает, какое соединение использовать для отправки ответа.

Мой вопрос таков: есть ли способ установить соединение «по умолчанию» или дополнить полезную нагрузку корреляцией данных без отправки этого восходящего потока??

Проблема в том, что мое приложение должно быть прозрачным прокси по отношению к вышестоящему серверу.Если я вообще увеличу полезную нагрузку коррелирующими данными, вышестоящий сервер отклонит ее.

1 Ответ

0 голосов
/ 02 февраля 2019

Трудно сопоставить запрос / ответ без данных, содержащих информацию о корреляции.

TcpOutboundGateway может сделать это, потому что для корреляции используется сам сокет;только один запрос может быть ожидающим для каждого сокета одновременно.CachingClientConnectionFactory разрешает параллелизм в шлюзе, поддерживая пул сокетов.

Одним из методов может быть настраиваемая фабрика клиентских соединений, которая поддерживает однозначное сопоставление между соединениями фабрики серверов и исходящими соединениями.Затем, когда ответ получен, найдите соответствующее соединение фабрики серверов, на которое нужно отправить ответ.Для этого потребуется пара карт: идентификатор соединения с сервером и соединение с клиентом и идентификатор соединения с сервером.

Если вы нашли решение, рассмотрите возможность добавления его обратно в платформу.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...