Как реализовать простой сервис эхо-сокетов в Spring Integration DSL - PullRequest
0 голосов
/ 14 марта 2019

Пожалуйста,
Не могли бы вы помочь с реализацией простого, эхо-стиля, службы сокетов Heartbeat TCP в Spring Integration DSL? Точнее, как подключить адаптер / обработчик / шлюз к IntegrationFlows на стороне клиента и сервера. Трудно найти практические примеры для взаимодействия клиент-сервер Spring Integration DSL и TCP / IP.

Я думаю, я прибил большую часть кода, это просто то, что нужно соединить все вместе в IntegrationFlow.

В примерах SI есть пример echo service , но он написан в «старой» конфигурации XML, и я действительно изо всех сил пытаюсь преобразовать его в конфигурацию с помощью кода.

Служба «Мой пульс» - это простой сервер, ожидающий, пока клиент запросит «статус», и ответит «ОК».

Нет @ServiceActivator, нет @MessageGateways, нет прокси, все явно и многословно; управляемый простым запланированным исполнителем JDK на стороне клиента; сервер и клиент в отдельных конфигах и проектах.

HeartbeatClientConfig

@Configuration
@EnableIntegration
public class HeartbeatClientConfig {

    @Bean
    public MessageChannel outboudChannel() {
        return new DirectChannel();
    }

    @Bean
    public PollableChannel inboundChannel() {
        return new QueueChannel();
    }

    @Bean
    public TcpNetClientConnectionFactory connectionFactory() {
        TcpNetClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory("localhost", 7777);
        return connectionFactory;
    }

    @Bean
    public TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter(
            TcpNetClientConnectionFactory connectionFactory,
            MessageChannel inboundChannel) {
        TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter = new TcpReceivingChannelAdapter();
        heartbeatReceivingMessageAdapter.setConnectionFactory(connectionFactory);
        heartbeatReceivingMessageAdapter.setOutputChannel(inboundChannel); // ???
        heartbeatReceivingMessageAdapter.setClientMode(true);
        return heartbeatReceivingMessageAdapter;
    }

    @Bean
    public TcpSendingMessageHandler heartbeatSendingMessageHandler(
            TcpNetClientConnectionFactory connectionFactory) {
        TcpSendingMessageHandler heartbeatSendingMessageHandler = new TcpSendingMessageHandler();
        heartbeatSendingMessageHandler.setConnectionFactory(connectionFactory);
        return heartbeatSendingMessageHandler;
    }

    @Bean
    public IntegrationFlow heartbeatClientFlow(
            TcpNetClientConnectionFactory connectionFactory,
            TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter,
            TcpSendingMessageHandler heartbeatSendingMessageHandler,
            MessageChannel outboudChannel) {
        return IntegrationFlows
                .from(outboudChannel) // ??????
                .// adapter ???????????
                .// gateway ???????????
                .// handler ???????????
                .get();
    }

    @Bean
    public HeartbeatClient heartbeatClient(
            MessageChannel outboudChannel,
            PollableChannel inboundChannel) {
        return new HeartbeatClient(outboudChannel, inboundChannel);
    }
}

HeartbeatClient

public class HeartbeatClient {
    private final MessageChannel outboudChannel;
    private final PollableChannel inboundChannel;
    private final Logger log = LogManager.getLogger(HeartbeatClient.class);

    public HeartbeatClient(MessageChannel outboudChannel, PollableChannel inboundChannel) {
        this.inboundChannel = inboundChannel;
        this.outboudChannel = outboudChannel;
    }

    @EventListener
    public void initializaAfterContextIsReady(ContextRefreshedEvent event) {
        log.info("Starting Heartbeat client...");
        start();
    }

    public void start() {
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
            while (true) {
                try {
                    log.info("Sending Heartbeat");
                    outboudChannel.send(new GenericMessage<String>("status"));
                    Message<?> message = inboundChannel.receive(1000);
                    if (message == null) {
                        log.error("Heartbeat timeouted");
                    } else {
                        String messageStr = new String((byte[]) message.getPayload());
                        if (messageStr.equals("OK")) {
                            log.info("Heartbeat OK response received");
                        } else {
                            log.error("Unexpected message content from server: " + messageStr);
                        }
                    }
                } catch (Exception e) {
                    log.error(e);
                }
            }
        }, 0, 10000, TimeUnit.SECONDS);
    }
}

HeartbeatServerConfig

@Configuration
@EnableIntegration
public class HeartbeatServerConfig {

    @Bean
    public MessageChannel outboudChannel() {
        return new DirectChannel();
    }

    @Bean
    public PollableChannel inboundChannel() {
        return new QueueChannel();
    }

    @Bean
    public TcpNetServerConnectionFactory connectionFactory() {
        TcpNetServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(7777);
        return connectionFactory;
    }

    @Bean
    public TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter(
            TcpNetServerConnectionFactory connectionFactory,
            MessageChannel outboudChannel) {
        TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter = new TcpReceivingChannelAdapter();
        heartbeatReceivingMessageAdapter.setConnectionFactory(connectionFactory);
        heartbeatReceivingMessageAdapter.setOutputChannel(outboudChannel);
        return heartbeatReceivingMessageAdapter;
    }

    @Bean
    public TcpSendingMessageHandler heartbeatSendingMessageHandler(
            TcpNetServerConnectionFactory connectionFactory) {
        TcpSendingMessageHandler heartbeatSendingMessageHandler = new TcpSendingMessageHandler();
        heartbeatSendingMessageHandler.setConnectionFactory(connectionFactory);
        return heartbeatSendingMessageHandler;
    }

    @Bean
    public IntegrationFlow heartbeatServerFlow(
            TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter,
            TcpSendingMessageHandler heartbeatSendingMessageHandler,
            MessageChannel outboudChannel) {
        return IntegrationFlows
                .from(heartbeatReceivingMessageAdapter) // ???????????????
                .handle(heartbeatSendingMessageHandler) // ???????????????
                .get();
    }

    @Bean
    public HeartbeatServer heartbeatServer(
            PollableChannel inboundChannel, 
            MessageChannel outboudChannel) {
        return new HeartbeatServer(inboundChannel, outboudChannel);
    }
}

HeartbeatServer

public class HeartbeatServer {
    private final PollableChannel inboundChannel;
    private final MessageChannel outboudChannel;
    private final Logger log = LogManager.getLogger(HeartbeatServer.class);

    public HeartbeatServer(PollableChannel inboundChannel, MessageChannel outboudChannel) {
        this.inboundChannel = inboundChannel;
        this.outboudChannel = outboudChannel;
    }

    @EventListener
    public void initializaAfterContextIsReady(ContextRefreshedEvent event) {
        log.info("Starting Heartbeat");
        start();
    }

    public void start() {
        Executors.newSingleThreadExecutor().execute(() -> {
            while (true) {
                try {
                    Message<?> message = inboundChannel.receive(1000);
                    if (message == null) {
                        log.error("Heartbeat timeouted");
                    } else {
                        String messageStr = new String((byte[]) message.getPayload());
                        if (messageStr.equals("status")) {
                            log.info("Heartbeat received");
                            outboudChannel.send(new GenericMessage<>("OK"));
                        } else {
                            log.error("Unexpected message content from client: " + messageStr);
                        }
                    }
                } catch (Exception e) {
                    log.error(e);
                }
            }
        });
    }
}

Бонусный вопрос

Почему канал может быть установлен в TcpReceivingChannelAdapter (входящий адаптер), но не в TcpSendingMessageHandler (исходящий адаптер)?

UPDATE
Вот полный исходный код проекта, если кто-то заинтересован, чтобы кто-нибудь его клонировал:
https://bitbucket.org/espinosa/spring-integration-tcp-demo
Я постараюсь поместить все предложенные решения там.

Ответы [ 2 ]

1 голос
/ 18 марта 2019

Для всех, кто интересуется, вот одно из рабочих решений, которые я сделал с помощью Гари Рассела. Все кредиты Гэри Расселу . Полный исходный код проекта здесь .

Основные характеристики:

  • IntegrationFlows: использовать только входящие и исходящие шлюзы.
  • Адаптеры или каналы не требуются;нет ServiceActivators или прокси Message Gateway.
  • Нет необходимости в ScheduledExecutor или Executors;код клиента и сервера получил значение
  • IntegrationFlow напрямую вызывает методы для класса клиента и класса сервера;Мне нравится этот тип явного соединения.
  • Разделение клиентского класса на две части, два метода: часть, производящая запрос, и часть, обрабатывающая ответ;таким образом, он может быть лучше связан с потоками.
  • явно определяет clientConnectionFactory / serverConnectionFactory.Таким образом, другие вещи можно будет явно настроить позже.

HeartbeatClientConfig

@Bean
public IntegrationFlow heartbeatClientFlow(
        TcpNetClientConnectionFactory clientConnectionFactory,
        HeartbeatClient heartbeatClient) {
    return IntegrationFlows.from(heartbeatClient::send,  e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
            .handle(Tcp.outboundGateway(clientConnectionFactory))
            .handle(heartbeatClient::receive)
            .get();
}

HeartbeatClient

public class HeartbeatClient {
    private final Logger log = LogManager.getLogger(HeartbeatClient.class);

    public GenericMessage<String> send() {
        log.info("Sending Heartbeat");
        return new GenericMessage<String>("status");
    }

    public Object receive(byte[] payload, MessageHeaders messageHeaders) { // LATER: use transformer() to receive String here
        String messageStr = new String(payload);
        if (messageStr.equals("OK")) {
            log.info("Heartbeat OK response received");
        } else {
            log.error("Unexpected message content from server: " + messageStr);
        }
        return null;
    }
}

HeartbeatServerConfig

@Bean
public IntegrationFlow heartbeatServerFlow(
        TcpNetServerConnectionFactory serverConnectionFactory,
        HeartbeatServer heartbeatServer) {
    return IntegrationFlows
            .from(Tcp.inboundGateway(serverConnectionFactory))
            .handle(heartbeatServer::processRequest)
            .get();
}

HeartbeatServer

public class HeartbeatServer {
    private final Logger log = LogManager.getLogger(HeartbeatServer.class);

    public Message<String> processRequest(byte[] payload, MessageHeaders messageHeaders) {
        String messageStr = new String(payload);
        if (messageStr.equals("status")) {
            log.info("Heartbeat received");
            return new GenericMessage<>("OK");
        } else {
            log.error("Unexpected message content from client: " + messageStr);
            return null;
        }

    }
}
1 голос
/ 14 марта 2019

С DSL намного проще ...

@SpringBootApplication
@EnableScheduling
public class So55154418Application {

    public static void main(String[] args) {
        SpringApplication.run(So55154418Application.class, args);
    }

    @Bean
    public IntegrationFlow server() {
        return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1234)))
                .transform(Transformers.objectToString())
                .log()
                .handle((p, h) -> "OK")
                .get();
    }

    @Bean
    public IntegrationFlow client() {
        return IntegrationFlows.from(Gate.class)
                .handle(Tcp.outboundGateway(Tcp.netClient("localhost", 1234)))
                .transform(Transformers.objectToString())
                .handle((p, h) -> {
                    System.out.println("Received:" + p);
                    return null;
                })
                .get();
    }

    @Bean
    @DependsOn("client")
    public Runner runner(Gate gateway) {
        return new Runner(gateway);
    }

    public static class Runner {

        private final Gate gateway;

        public Runner(Gate gateway) {
            this.gateway = gateway;
        }

        @Scheduled(fixedDelay = 5000)
        public void run() {
            this.gateway.send("foo");
        }

    }

    public interface Gate {

        void send(String out);

    }

}

Или получить ответ от метода Gate ...

    @Bean
    public IntegrationFlow client() {
        return IntegrationFlows.from(Gate.class)
                .handle(Tcp.outboundGateway(Tcp.netClient("localhost", 1234)))
                .transform(Transformers.objectToString())
                .get();
    }

    @Bean
    @DependsOn("client")
    public Runner runner(Gate gateway) {
        return new Runner(gateway);
    }

    public static class Runner {

        private final Gate gateway;

        public Runner(Gate gateway) {
            this.gateway = gateway;
        }

        @Scheduled(fixedDelay = 5000)
        public void run() {
            String reply = this.gateway.sendAndReceive("foo"); // null for timeout
            System.out.println("Received:" + reply);
        }

    }

    public interface Gate {

        @Gateway(replyTimeout = 5000)
        String sendAndReceive(String out);

    }

Бонус:

Конечные точки потребления на самом деле состоят из 2 бинов; потребитель и обработчик сообщений. Канал идет на потребителя. Смотри здесь .

EDIT

Альтернатива для одного компонента для клиента ...

@Bean
public IntegrationFlow client() {
    return IntegrationFlows.from(() -> "foo", 
                    e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", 1234)))
            .transform(Transformers.objectToString())
            .handle((p, h) -> {
                System.out.println("Received:" + p);
                return null;
            })
            .get();
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...