Служба эхо-сокетов в Spring Integration DSL с использованием каналов и шлюзов - PullRequest
0 голосов
/ 25 марта 2019

Это вариант моего вопроса Как реализовать простой сервис эхо-сокетов в Spring Integration DSL . Были представлены хорошие рабочие решения, но я хотел бы изучить альтернативы. В частности, меня интересует решение, основанное на явном использовании входящих и исходящих каналов, в реализациях клиента и сервера. Это возможно?

Пока я смог придумать:

HeartbeatClientConfig

...
@Bean
public IntegrationFlow heartbeatClientFlow(
        TcpNetClientConnectionFactory clientConnectionFactory,
        MessageChannel outboundChannel,
        PollableChannel inboundChannel) {
    return IntegrationFlows
            .from(outboundChannel)
            .handle(Tcp.outboundGateway(clientConnectionFactory))
            .channel(inboundChannel)
            .get();
}
...

HeartbeatClient

public HeartbeatClient(MessageChannel outboudChannel, PollableChannel inboundChannel) {
    this.inboundChannel = inboundChannel;
    this.outboudChannel = outboudChannel;
}
...
void run() {
    // ..in scheduled intervals in loop 
    outboudChannel.send(new GenericMessage<String>("status"));
    Message<?> message = inboundChannel.receive(1000);
}

Клиентская часть работает нормально. Проблема на стороне сервера.

HeartbeatServer

public HeartbeatServer(PollableChannel inboundChannel, MessageChannel outboudChannel) {
    this.inboundChannel = inboundChannel;
    this.outboudChannel = outboudChannel;
}
...
void run() {
    // ..in some kind of loop
    Message<?> message = inboundChannel.receive(1000); // presumably a blocking call
    ...
    outboudChannel.send(new GenericMessage<>("OK"));
    ...
}

HeartbeatServerConfig
Здесь идет самая сложная часть, где я уверен, что я не прав. Я просто не знаю, что мне делать. Здесь я наивно использую обратный подход из клиентской реализации, где он работает; обратный в смысле переключения входящих и исходящих каналов в определении потока.

...
@Bean
public IntegrationFlow heartbeatServerFlow(
        MessageChannel outboundChannel,
        PollableChannel inboundChannel) {
    return IntegrationFlows
            .from(inboundChannel)
            .handle(Tcp.inboundGateway(Tcp.netServer(7777)))
            .channel(outboundChannel)
            .get();
}
...

Сервер не работает, выбрасывает загадочное исключение около Found ambiguous parameter type [class java.lang.Boolean] for method match ..., за которым следует длинный список методов Spring и Spring Integration.

Полный исходный код можно найти здесь .

Ответы [ 2 ]

1 голос
/ 25 марта 2019

Невозможно запустить поток на стороне сервера с каналом.

Поток начинается со шлюза;он обрабатывает все сообщения сокета.Когда он получает сообщение, он отправляет его на канал.

Вы можете сделать это ...

@Bean
public IntegrationFlow server(PollableChannel requests, MessageChannel replies) {
    return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1234))
                .replyChannel(replies))
            .transform(Transformers.objectToString())
            .channel(requests)
            .get();
}

Но я бы спросил, почему вы хотите, потому что теперь вы должны управлять своимсобственный поток для получения от канала запроса и записи в канал ответа.Чтобы это работало, заголовок replyChannel из сообщения запроса должен быть скопирован в ответное сообщение .На самом деле вам не нужен канал ответа;Вы можете отправить ответ непосредственно в заголовок replyChannel (это то, что происходит внутри, мы соединяем канал ответа с каналом заголовка).

Намного проще обрабатывать запрос в потоке шлюза.

0 голосов
/ 01 апреля 2019

Просто в дополнение к идеальному ответу Гари, вот полный код, если кому-то интересно.

Мне пришлось явно указать TcpNetServerConnectionFactory, чтобы установить ByteArrayLengthHeaderSerializer в качестве сериализатора / десериализатора. Это не работает без него.

HeartbeatServerConfig полный код

@Bean
public TcpNetServerConnectionFactory connectionFactory() {
    TcpNetServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(7777);
    connectionFactory.setSerializer(new ByteArrayLengthHeaderSerializer());
    connectionFactory.setDeserializer(new ByteArrayLengthHeaderSerializer());
    return connectionFactory;
}

@Bean
public IntegrationFlow heartbeatServerFlow(
        TcpNetServerConnectionFactory connectionFactory,
        PollableChannel inboundChannel, 
        MessageChannel outboundChannel) {
    return IntegrationFlows.from(Tcp.inboundGateway(connectionFactory)
            .replyChannel(outboundChannel))
            .channel(inboundChannel)
            .get();
}

HeartbeatServer полный код

public void start() {
    Executors.newSingleThreadExecutor().execute(() -> {
        while (true) {
            try {
                Message<?> request = inboundChannel.receive();
                if (request == null) {
                    log.error("Heartbeat timeouted");
                } else {
                    MessageChannel outboudChannel = (MessageChannel)request.getHeaders().getReplyChannel();
                    String requestPayload = new String((byte[]) request.getPayload());
                    if (requestPayload.equals("status")) {
                        log.info("Heartbeat received");
                        outboudChannel.send(new GenericMessage<>("OK"));
                    } else {
                        log.error("Unexpected message content from client: " + requestPayload);
                    }
                }
            } catch (Exception e) {
                log.error(e);
            }
        }
    });
}

Важным битом, конечно же, является получение исходящего канала из самого сообщения запроса как: MessageChannel outboudChannel = (MessageChannel)request.getHeaders().getReplyChannel()

Полный код можно найти здесь.

...