Слушайте канал TCP с пружинной интеграцией - PullRequest
0 голосов
/ 20 марта 2020

Я хочу подключиться к двум разным каналам TCP, зашифрованным TLS, и одновременно читать и отправлять json данные. Входящие, которые будут, конечно, как-то обработаны. Это не обязательно должно быть обработано в Spring-интеграции, но может быть помещено в очередь или что-то еще.

Я не хочу жестко кодировать хост и порты, потому что я получаю их во время выполнения от REST API, который Я звоню.

То, что я отправляю, не обязательно будет генерировать ответ, и я не обязательно ожидаю ответа от данных, которые я отправляю.

У меня возникли проблемы с реализацией это с весны интеграции. Мне удалось получить что-то вроде этого, но не совсем работающее, выполнив:

    public static IntegrationFlow RegisterFeedFlow(final IntegrationFlowContext flowContext,
            final String id, final String host, final int port) {
        IntegrationFlow feedFlow = f -> f
                .handle(Tcp
                        .outboundGateway(Tcp.netClient(host, port).serializer(TcpCodecs.crlf())
                                .deserializer(TcpCodecs.lengthHeader1()))
                        .remoteTimeout(m -> 5000))
                .transform(Transformers.objectToString()).handle(System.out::println);


        flowContext.registration(feedFlow).id(id).register();

        return feedFlow;
    }

Чего здесь не хватает:

  1. TLS.
  2. Потому что я используя Tcp.outboundGateway (), он всегда ожидает ответа для данных, которые я отправляю. Я не хочу этого.

Я предполагаю, что я мог бы автоматически кодировать и декодировать json, помещая что-то в .serializer () и .deserializer (). Правильно ли это предположение?

Как бы это правильно реализовать?

1 Ответ

0 голосов
/ 20 марта 2020
@SpringBootApplication
public class Gitter66Application {

    public static void main(String[] args) {
        SpringApplication.run(Gitter66Application.class, args);
    }

    @Bean
    public TcpConnectionFactoryFactoryBean privateFeedClient() {
        TcpConnectionFactoryFactoryBean fb = new TcpConnectionFactoryFactoryBean("client");
        fb.setHost("localhost");
        fb.setPort(1234);
        fb.setSerializer(TcpCodecs.crlf());
        fb.setDeserializer(TcpCodecs.lengthHeader1());
        fb.setSslContextSupport(new DefaultTcpSSLContextSupport("keystore.ks",
                "trustStore.ks", "keyStorePassword", "trustStorePassword"));
        return fb;
    }

    @Bean
    public TcpConnectionFactoryFactoryBean privateFeedServer() {
        TcpConnectionFactoryFactoryBean fb = new TcpConnectionFactoryFactoryBean("server");
        fb.setPort(1234);
        fb.setSerializer(TcpCodecs.lengthHeader1());
        fb.setDeserializer(TcpCodecs.crlf());
        fb.setSslContextSupport(new DefaultTcpSSLContextSupport("keystore.ks",
                "trustStore.ks", "keyStorePassword", "trustStorePassword"));
        return fb;
    }

    @Bean
    public IntegrationFlow flowOut(AbstractClientConnectionFactory cf) {
        return f -> f.handle(Tcp.outboundAdapter(cf));
    }

    @Bean
    public IntegrationFlow flowIn(AbstractServerConnectionFactory cf) {
        return IntegrationFlows.from(Tcp.inboundAdapter(cf))
                .transform(Transformers.objectToString())
                .handle(System.out::println)
                .get();
    }

    @Bean
    public ApplicationRunner runner(@Qualifier("flowOut.input") MessageChannel channel) {
        return args -> {
            channel.send(new GenericMessage<>("foo"));
        };
    }

}

Результат:

GenericMessage [payload=foo, headers={ip_tcp_remotePort=62471, ...

РЕДАКТИРОВАТЬ

Вы, конечно, можете регистрировать потоки в соответствии с вашими вопросами. Для динамической регистрации заводов:

    private TcpConnectionFactoryFactoryBean privateFeedClient(String host, int port) {
        TcpConnectionFactoryFactoryBean fb = new TcpConnectionFactoryFactoryBean("client");
        fb.setHost(host);
        fb.setPort(port);
        fb.setSerializer(TcpCodecs.crlf());
        fb.setDeserializer(TcpCodecs.lengthHeader1());
        fb.setSslContextSupport(new DefaultTcpSSLContextSupport("keystore.ks",
                "trustStore.ks", "keyStorePassword", "trustStorePassword"));
        return fb;
    }

    private TcpConnectionFactoryFactoryBean privateFeedServer(int port) {
        TcpConnectionFactoryFactoryBean fb = new TcpConnectionFactoryFactoryBean("server");
        fb.setPort(1234);
        fb.setSerializer(TcpCodecs.lengthHeader1());
        fb.setDeserializer(TcpCodecs.crlf());
        fb.setSslContextSupport(new DefaultTcpSSLContextSupport("keystore.ks",
                "trustStore.ks", "keyStorePassword", "trustStorePassword"));
        return fb;
    }

    @Bean
    public ApplicationRunner runner(GenericApplicationContext context) {

        return args -> {
            TcpConnectionFactoryFactoryBean server = privateFeedServer(1234);
            context.registerBean("server", TcpConnectionFactoryFactoryBean.class,
                    () -> server);
            TcpConnectionFactoryFactoryBean client = privateFeedClient("localhost", 1234);
            context.registerBean("client", TcpConnectionFactoryFactoryBean.class,
                    () -> client);
            // register flows
        };
    }

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...