Как настроить адаптеры интеграции Spring для простого подключения клиента и сервера, отправляющего сообщения - PullRequest
0 голосов
/ 13 июля 2020

Я пытаюсь реализовать следующий сценарий с помощью Spring Integration:

Мне нужен клиент для подключения к серверу через TCP IP и ожидания получения сообщений в течение 30 секунд. Мне нужен сервер для отправки от 0 до n сообщений подключенному клиенту. Мне нужен способ запускать и останавливать передачу каналов без потери сообщений. Мне нужно изменить порт, который сервер прослушивает между остановкой и запуском.

Пока это моя конфигурация:

@Configuration
public class TcpConfiguration {
    private static Logger LOG = LoggerFactory.getLogger(TcpConfiguration.class);

    @Value("${port}")
    private Integer port;

    @Value("${so-timeout}")
    private Integer soTimeout;

    @Value("${keep-alive}")
    private Boolean keepAlive;

    @Value("${send-timeout}")
    private Integer sendTimeout;

    @Bean
    public AbstractServerConnectionFactory getMyConnFactory() {
        LOG.debug("getMyConnFactory");
        TcpNetServerConnectionFactory factory = new TcpNetServerConnectionFactory(port);
        LOG.debug("getMyConnFactory port={}", port);
        factory.setSoTimeout(soTimeout);
        LOG.debug("getMyConnFactory soTimeout={}", soTimeout);
        factory.setSoKeepAlive(true);
        LOG.debug("getMyConnFactory keepAlive={}", keepAlive);
        return factory;
    }

    @Bean
    public AbstractEndpoint getMyChannelAdapter() {
        LOG.debug("getMyChannelAdapter");
        TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
        adapter.setConnectionFactory(getMyConnFactory());
        adapter.setOutputChannel(myChannelIn());
        adapter.setSendTimeout(sendTimeout);
        LOG.debug("getMyChannelAdapter adapter={}", adapter.getClass().getName());
        return adapter;
    }

    @Bean
    public MessageChannel myChannelIn() {
        LOG.debug("myChannelIn");
        return new DirectChannel();
    }

    @Bean
    @Transformer(inputChannel = "myChannelIn", outputChannel = "myServiceChannel")
    public ObjectToStringTransformer myTransformer() {
        LOG.debug("myTransformer");
        return new ObjectToStringTransformer();
    }

    @ServiceActivator(inputChannel = "myServiceChannel")
    public void service(String in) {
        LOG.debug("service received={}", in);
    }

    @Bean
    public MessageChannel myChannelOut() {
        LOG.debug("myChannelOut");
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow myOutbound() {
        LOG.debug("myOutbound");
        return IntegrationFlows.from(myChannelOut())
                .handle(mySender())
                .get();
    }

    @Bean
    public MessageHandler mySender() {
        LOG.debug("mySender");
        TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
        tcpSendingMessageHandler.setConnectionFactory(getMyConnFactory());
        return tcpSendingMessageHandler;
    }
}

Пожалуйста, посоветуйте!

Чтобы сменить сервер port Я бы отключил контекст приложения и перезапустил его после настройки нового порта на удаленном сервере конфигурации. Могу я просто закрыть контекст приложения, не нарушая текущую передачу сообщения? Я не знаю, как обращаться с клиентом только для подключения.

1 Ответ

0 голосов
/ 13 июля 2020

Используйте динамику c регистрацию потока ; просто получите соединение, чтобы открыть его без отправки.

@SpringBootApplication
public class So62867670Application {

    public static void main(String[] args) {
        SpringApplication.run(So62867670Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(DynamicTcpReceiver receiver) {
        return args -> { // Just a demo to show starting/stopping
            receiver.connectAndListen(1234);
            System.in.read();
            receiver.stop();
            System.in.read();
            receiver.connectAndListen(1235);
            System.in.read();
            receiver.stop();
        };
    }

}

@Component
class DynamicTcpReceiver {

    @Autowired
    private IntegrationFlowContext context;

    private IntegrationFlowRegistration registration;

    public void connectAndListen(int port) throws InterruptedException {
        TcpClientConnectionFactorySpec client = Tcp.netClient("localhost", port)
                .deserializer(TcpCodecs.lf());
        IntegrationFlow flow = IntegrationFlows.from(Tcp.inboundAdapter(client))
                .transform(Transformers.objectToString())
                .handle(System.out::println)
                .get();
        this.registration = context.registration(flow).register();
        client.get().getConnection(); // just open the single shared connection
    }

    public void stop() {
        if (this.registration != null) {
            this.registration.destroy();
            this.registration = null;
        }
    }
}

EDIT

А это на стороне сервера ...

@SpringBootApplication
@EnableScheduling
public class So62867670ServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(So62867670ServerApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner(DynamicTcpServer receiver) {
        return args -> { // Just a demo to show starting/stopping
            receiver.tcpListen(1234);
            System.in.read();
            receiver.stop(1234);
            System.in.read();
            receiver.tcpListen(1235);
            System.in.read();
            receiver.stop(1235);
        };
    }

}

@Component
class DynamicTcpServer {

    private static final Logger LOG = LoggerFactory.getLogger(DynamicTcpServer.class);

    @Autowired
    private IntegrationFlowContext flowContext;

    @Autowired
    private ApplicationContext appContext;

    private final Map<Integer, IntegrationFlowRegistration> registrations = new HashMap<>();

    private final Map<String, Entry<Integer, AtomicInteger>> clients = new ConcurrentHashMap<>();

    public void tcpListen(int port) {
        TcpServerConnectionFactorySpec server = Tcp.netServer(port)
                .id("server-" + port)
                .serializer(TcpCodecs.lf());
        server.get().registerListener(msg -> false); // dummy listener so the accept thread doesn't exit
        IntegrationFlow flow = f -> f.handle(Tcp.outboundAdapter(server));
        this.registrations.put(port, flowContext.registration(flow).register());
    }

    public void stop(int port) {
        IntegrationFlowRegistration registration = this.registrations.remove(port);
        if (registration != null) {
            registration.destroy();
        }
    }

    @EventListener
    public void closed(TcpConnectionOpenEvent event) {
        LOG.info(event.toString());
        String connectionId = event.getConnectionId();
        String[] split = connectionId.split(":");
        int port = Integer.parseInt(split[2]);
        this.clients.put(connectionId, new AbstractMap.SimpleEntry<>(port, new AtomicInteger()));
    }

    @EventListener
    public void closed(TcpConnectionCloseEvent event) {
        LOG.info(event.toString());
        this.clients.remove(event.getConnectionId());
    }

    @EventListener
    public void listening(TcpConnectionServerListeningEvent event) {
        LOG.info(event.toString());
    }

    @Scheduled(fixedDelay = 5000)
    public void sender() {
        this.clients.forEach((connectionId, portAndCount) -> {
            IntegrationFlowRegistration registration = this.registrations.get(portAndCount.getKey());
            if (registration != null) {
                LOG.info("Sending to " + connectionId);
                registration.getMessagingTemplate().send(MessageBuilder.withPayload("foo")
                        .setHeader(IpHeaders.CONNECTION_ID, connectionId).build());
                if (portAndCount.getValue().incrementAndGet() > 9) {
                    this.appContext.getBean("server-" + portAndCount.getKey(), TcpNetServerConnectionFactory.class)
                        .closeConnection(connectionId);
                }
            }
        });
    }

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...