Spring Integration: клиент / сервер TCP открывает одно клиентское соединение на соединение с сервером - PullRequest
0 голосов
/ 06 июля 2018

Я пытаюсь реализовать клиент-серверное приложение TCP с помощью Spring Integration, где мне нужно открыть один сокет TCP-клиента для каждого входящего соединения TCP-сервера.

По сути, у меня есть группа устройств IoT, которые взаимодействуют с внутренним сервером через необработанные сокеты TCP. Мне нужно реализовать дополнительные функции в системе. Но программное обеспечение на устройствах и на сервере является закрытым исходным кодом, поэтому я ничего не могу с этим поделать. Поэтому я подумал о том, чтобы разместить промежуточное программное обеспечение между устройствами и сервером, которое будет перехватывать эту связь клиент-сервер и обеспечивать дополнительную функциональность.

Я использую TcpNioServerConnectionFactory и TcpNioClientConnectionFactory с адаптерами входящего / исходящего канала для отправки / получения сообщений от всех сторон. Но в структуре сообщения нет информации, которая связывает сообщение с определенным устройством; поэтому мне приходится открывать новый клиентский сокет для серверной части каждый раз, когда новое соединение от нового устройства приходит на серверный сокет. Это клиентское соединение должно быть связано с жизненным циклом этого конкретного сокета сервера. Он никогда не должен использоваться повторно, и если по какой-либо причине этот клиентский сокет (серверная часть к промежуточному программному обеспечению) умирает, сокет сервера (промежуточное программное обеспечение к устройству) также должен быть закрыт. Как я могу пойти по этому поводу?

Редактировать: Моей первой мыслью было создание подкласса AbstractClientConnectionFactory, но, похоже, он ничего не делает, кроме как обеспечивает клиентское соединение при запросе. Стоит ли изучать подклассы адаптеров входящего / исходящего канала или где-либо еще? Я должен также упомянуть, что я также открыт для интеграционных решений, отличных от Spring, таких как Apache Camel, или даже для нестандартного решения с необработанными сокетами NIO.

Редактировать 2: Я попал на полпути, переключившись на TcpNetServerConnectionFactory и обернув клиентскую фабрику ThreadAffinityClientConnectionFactory, и устройства могут достичь штрафа сервера. Но когда сервер отсылает что-то обратно, я получаю ошибку Unable to find outbound socket for GenericMessage и клиентский сокет умирает. Я думаю, это потому, что у серверной части нет необходимого заголовка для правильной маршрутизации сообщения. Как я могу получить эту информацию? Мой класс конфигурации выглядит следующим образом:

@Configuration
@EnableIntegration
@IntegrationComponentScan
public class ServerConfiguration {

    @Bean
    public AbstractServerConnectionFactory serverFactory() {
        AbstractServerConnectionFactory factory = new TcpNetServerConnectionFactory(8000);
        factory.setSerializer(new MapJsonSerializer());
        factory.setDeserializer(new MapJsonSerializer());
        return factory;
    }

    @Bean
    public AbstractClientConnectionFactory clientFactory() {
        AbstractClientConnectionFactory factory = new TcpNioClientConnectionFactory("localhost", 3333);
        factory.setSerializer(new MapJsonSerializer());
        factory.setDeserializer(new MapJsonSerializer());
        factory.setSingleUse(true);
        return new ThreadAffinityClientConnectionFactory(factory);
    }

    @Bean
    public TcpReceivingChannelAdapter inboundDeviceAdapter(AbstractServerConnectionFactory connectionFactory) {
        TcpReceivingChannelAdapter inbound = new TcpReceivingChannelAdapter();
        inbound.setConnectionFactory(connectionFactory);
        return inbound;
    }

    @Bean
    public TcpSendingMessageHandler outboundDeviceAdapter(AbstractServerConnectionFactory connectionFactory) {
        TcpSendingMessageHandler outbound = new TcpSendingMessageHandler();
        outbound.setConnectionFactory(connectionFactory);
        return outbound;
    }

    @Bean
    public TcpReceivingChannelAdapter inboundBackendAdapter(AbstractClientConnectionFactory connectionFactory) {
        TcpReceivingChannelAdapter inbound = new TcpReceivingChannelAdapter();
        inbound.setConnectionFactory(connectionFactory);
        return inbound;
    }

    @Bean
    public TcpSendingMessageHandler outboundBackendAdapter(AbstractClientConnectionFactory connectionFactory) {
        TcpSendingMessageHandler outbound = new TcpSendingMessageHandler();
        outbound.setConnectionFactory(connectionFactory);
        return outbound;
    }

    @Bean
    public IntegrationFlow backendIntegrationFlow() {
        return IntegrationFlows.from(inboundBackendAdapter(clientFactory()))
                .log(LoggingHandler.Level.INFO)
                .handle(outboundDeviceAdapter(serverFactory()))
                .get();
    }

    @Bean
    public IntegrationFlow deviceIntegrationFlow() {
        return IntegrationFlows.from(inboundDeviceAdapter(serverFactory()))
                .log(LoggingHandler.Level.INFO)
                .handle(outboundBackendAdapter(clientFactory()))
                .get();
    }
}

1 Ответ

0 голосов
/ 06 июля 2018

Не совсем понятно, о чем вы спрашиваете, поэтому я предполагаю, что вы имеете в виду, что вам нужен весенний интеграционный прокси между вашими клиентами и серверами. Что-то вроде:

iot-device -> spring server -> message-transformation -> spring client -> back-end-server

Если это так, вы можете реализовать ClientConnectionIdAware фабрику клиентских соединений, которая включает стандартную фабрику.

В потоке интеграции свяжите входящий заголовок ip_connectionId в сообщении с потоком (в ThreadLocal).

Затем в фабрике клиентских соединений найдите соответствующее исходящее соединение на карте, используя значение ThreadLocal; если не найден (или закрыт), создайте новый и сохраните его на карте для дальнейшего использования.

Реализация ApplictionListener (или @EventListener) для прослушивания TcpConnectionCloseEvent s от фабрики соединений с сервером и close() соответствующего исходящего соединения.

Это звучит как классное улучшение, так что подумайте над тем, чтобы добавить его обратно в фреймворк.

EDIT

Версия 5.0 добавила ThreadAffinityClientConnectionFactory, которая работала бы из коробки с TcpNetServerConnectionFactory, так как каждое соединение получает свой собственный поток.

С TcpNioServerConnectionFactory вам потребуется дополнительная логика для динамического связывания соединения с потоком для каждого запроса.

EDIT2

@SpringBootApplication
public class So51200675Application {

    public static void main(String[] args) {
        SpringApplication.run(So51200675Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner() {
        return args -> {
            Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234);
            socket.getOutputStream().write("foo\r\n".getBytes());
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            System.out.println(reader.readLine());
            socket.close();
        };
    }

    @Bean
    public Map<String, String> fromToConnectionMappings() {
        return new ConcurrentHashMap<>();
    }

    @Bean
    public Map<String, String> toFromConnectionMappings() {
        return new ConcurrentHashMap<>();
    }

    @Bean
    public IntegrationFlow proxyInboundFlow() {
        return IntegrationFlows.from(Tcp.inboundAdapter(serverFactory()))
                .transform(Transformers.objectToString())
                .<String, String>transform(s -> s.toUpperCase())
                .handle((p, h) -> {
                    mapConnectionIds(h);
                    return p;
                })
                .handle(Tcp.outboundAdapter(threadConnectionFactory()))
                .get();
    }

    @Bean
    public IntegrationFlow proxyOutboundFlow() {
        return IntegrationFlows.from(Tcp.inboundAdapter(threadConnectionFactory()))
                .transform(Transformers.objectToString())
                .<String, String>transform(s -> s.toUpperCase())
                .enrichHeaders(e -> e
                        .headerExpression(IpHeaders.CONNECTION_ID, "@toFromConnectionMappings.get(headers['"
                                + IpHeaders.CONNECTION_ID + "'])").defaultOverwrite(true))
                .handle(Tcp.outboundAdapter(serverFactory()))
                .get();
    }

    private void mapConnectionIds(Map<String, Object> h) {
        try {
            TcpConnection connection = threadConnectionFactory().getConnection();
            String mapping = toFromConnectionMappings().get(connection.getConnectionId());
            String incomingCID = (String) h.get(IpHeaders.CONNECTION_ID);
            if (mapping == null || !(mapping.equals(incomingCID))) {
                System.out.println("Adding new mapping " + incomingCID + " to " + connection.getConnectionId());
                toFromConnectionMappings().put(connection.getConnectionId(), incomingCID);
                fromToConnectionMappings().put(incomingCID, connection.getConnectionId());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Bean
    public ThreadAffinityClientConnectionFactory threadConnectionFactory() {
        return new ThreadAffinityClientConnectionFactory(clientFactory()) {

            @Override
            public boolean isSingleUse() {
                return false;
            }

        };
    }

    @Bean
    public AbstractServerConnectionFactory serverFactory() {
        return Tcp.netServer(1234).get();
    }

    @Bean
    public AbstractClientConnectionFactory clientFactory() {
        AbstractClientConnectionFactory clientFactory = Tcp.netClient("localhost", 1235).get();
        clientFactory.setSingleUse(true);
        return clientFactory;
    }

    @Bean
    public IntegrationFlow serverFlow() {
        return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1235)))
                .transform(Transformers.objectToString())
                .<String, String>transform(p -> p + p)
                .get();
    }

    @Bean
    public ApplicationListener<TcpConnectionCloseEvent> closer() {
        return e -> {
            if (fromToConnectionMappings().containsKey(e.getConnectionId())) {
                String key = fromToConnectionMappings().remove(e.getConnectionId());
                toFromConnectionMappings().remove(key);
                System.out.println("Removed mapping " + e.getConnectionId() + " to " + key);
                threadConnectionFactory().releaseConnection();
            }
        };
    }

}

EDIT3

У меня отлично работает с MapJsonSerializer.

@SpringBootApplication
public class So51200675Application {

    public static void main(String[] args) {
        SpringApplication.run(So51200675Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner() {
        return args -> {
            Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234);
            socket.getOutputStream().write("{\"foo\":\"bar\"}\n".getBytes());
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            System.out.println(reader.readLine());
            socket.close();
        };
    }

    @Bean
    public Map<String, String> fromToConnectionMappings() {
        return new ConcurrentHashMap<>();
    }

    @Bean
    public Map<String, String> toFromConnectionMappings() {
        return new ConcurrentHashMap<>();
    }

    @Bean
    public MapJsonSerializer serializer() {
        return new MapJsonSerializer();
    }

    @Bean
    public IntegrationFlow proxyRequestFlow() {
        return IntegrationFlows.from(Tcp.inboundAdapter(serverFactory()))
                .<Map<String, String>, Map<String, String>>transform(m -> {
                    m.put("foo", m.get("foo").toUpperCase());
                    return m;
                })
                .handle((p, h) -> {
                    mapConnectionIds(h);
                    return p;
                })
                .handle(Tcp.outboundAdapter(threadConnectionFactory()))
                .get();
    }

    @Bean
    public IntegrationFlow proxyReplyFlow() {
        return IntegrationFlows.from(Tcp.inboundAdapter(threadConnectionFactory()))
                .<Map<String, String>, Map<String, String>>transform(m -> {
                    m.put("foo", m.get("foo").toLowerCase() + m.get("foo"));
                    return m;
                })
                .enrichHeaders(e -> e
                        .headerExpression(IpHeaders.CONNECTION_ID, "@toFromConnectionMappings.get(headers['"
                                + IpHeaders.CONNECTION_ID + "'])").defaultOverwrite(true))
                .handle(Tcp.outboundAdapter(serverFactory()))
                .get();
    }

    private void mapConnectionIds(Map<String, Object> h) {
        try {
            TcpConnection connection = threadConnectionFactory().getConnection();
            String mapping = toFromConnectionMappings().get(connection.getConnectionId());
            String incomingCID = (String) h.get(IpHeaders.CONNECTION_ID);
            if (mapping == null || !(mapping.equals(incomingCID))) {
                System.out.println("Adding new mapping " + incomingCID + " to " + connection.getConnectionId());
                toFromConnectionMappings().put(connection.getConnectionId(), incomingCID);
                fromToConnectionMappings().put(incomingCID, connection.getConnectionId());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Bean
    public ThreadAffinityClientConnectionFactory threadConnectionFactory() {
        return new ThreadAffinityClientConnectionFactory(clientFactory()) {

            @Override
            public boolean isSingleUse() {
                return false;
            }

        };
    }

    @Bean
    public AbstractServerConnectionFactory serverFactory() {
        return Tcp.netServer(1234)
                .serializer(serializer())
                .deserializer(serializer())
                .get();
    }

    @Bean
    public AbstractClientConnectionFactory clientFactory() {
        AbstractClientConnectionFactory clientFactory = Tcp.netClient("localhost", 1235)
                .serializer(serializer())
                .deserializer(serializer())
                .get();
        clientFactory.setSingleUse(true);
        return clientFactory;
    }

    @Bean
    public IntegrationFlow backEndEmulatorFlow() {
        return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1235)
                    .serializer(serializer())
                    .deserializer(serializer())))
                .<Map<String, String>, Map<String, String>>transform(m -> {
                    m.put("foo", m.get("foo") + m.get("foo"));
                    return m;
                })
                .get();
    }

    @Bean
    public ApplicationListener<TcpConnectionCloseEvent> closer() {
        return e -> {
            if (fromToConnectionMappings().containsKey(e.getConnectionId())) {
                String key = fromToConnectionMappings().remove(e.getConnectionId());
                toFromConnectionMappings().remove(key);
                System.out.println("Removed mapping " + e.getConnectionId() + " to " + key);
                threadConnectionFactory().releaseConnection();
            }
        };
    }

}

и

Добавление нового сопоставления localhost: 56998: 1234: 55c822a4-4252-45e6-9ef2-79263391f4be на localhost: 1235: 56999: 3d520ca9-2f3a-44c3-b05f-e59695b8c1b0 { "Foo": "barbarBARBAR"} Удалено отображение localhost: 56998: 1234: 55c822a4-4252-45e6-9ef2-79263391f4be на localhost: 1235: 56999: 3d520ca9-2f3a-44c3-b05f-e59695b8c1b0

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...