Spring-Integration: Ответ Tcp-сервера не отправляется при исключении - PullRequest
1 голос
/ 11 марта 2019

Я перенес устаревший код tcp-сервера в spring-boot и добавил зависимости для Spring-интеграции (на основе аннотаций) для обработки соединений через tcp-сокеты.

Мой входящий канал - tcpIn (), исходящий канал - serviceChannel (), и я создал собственный канал [exceptionEventChannel ()] для хранения сообщений о событиях исключения.

У меня есть собственный метод сериализатора / Deserialier (ByteArrayLengthPrefixSerializer () расширяет AbstractPooledBufferByteArraySerializer) и метод MessageHandler @ServiceActivator для отправки ответа обратно клиенту tcp.


//SpringBoot 2.0.3.RELEASE, Spring Integration 5.0.6.RELEASE

package com.test.config;


import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.event.inbound.ApplicationEventListeningMessageProducer;
import org.springframework.integration.ip.IpHeaders;
import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter;
import org.springframework.integration.ip.tcp.TcpSendingMessageHandler;
import org.springframework.integration.ip.tcp.connection.*;
import org.springframework.integration.ip.tcp.serializer.TcpDeserializationExceptionEvent;
import org.springframework.integration.router.ErrorMessageExceptionTypeRouter;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;

import java.io.IOException;

@Configuration
@IntegrationComponentScan
public class TcpConfiguration {
    @SuppressWarnings("unused")
    @Value("${tcp.connection.port}")
    private int tcpPort;

    @Bean
    TcpConnectionEventListener customerTcpListener() {
        return new TcpConnectionEventListener();
    }

    @Bean
    public MessageChannel tcpIn() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel serviceChannel() {
        return new DirectChannel();
    }

    @ConditionalOnMissingBean(name = "errorChannel")
    @Bean
    public MessageChannel errorChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel exceptionEventChannel() {
        return new DirectChannel();
    }

    @Bean
    public ByteArrayLengthPrefixSerializer byteArrayLengthPrefixSerializer() {
        ByteArrayLengthPrefixSerializer byteArrayLengthPrefixSerializer = new ByteArrayLengthPrefixSerializer();
        byteArrayLengthPrefixSerializer.setMaxMessageSize(98304); //max allowed size set to 96kb
        return byteArrayLengthPrefixSerializer;
    }

    @Bean
    public AbstractServerConnectionFactory tcpNetServerConnectionFactory() {
        TcpNetServerConnectionFactory tcpServerCf = new TcpNetServerConnectionFactory(tcpPort);
        tcpServerCf.setSerializer(byteArrayLengthPrefixSerializer());
        tcpServerCf.setDeserializer(byteArrayLengthPrefixSerializer());
        return tcpServerCf;

    }

    @Bean
    public TcpReceivingChannelAdapter tcpReceivingChannelAdapter() {
        TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
        adapter.setConnectionFactory(tcpNetServerConnectionFactory());
        adapter.setOutputChannel(tcpIn());
        adapter.setErrorChannel(exceptionEventChannel());
        return adapter;
    }

    @ServiceActivator(inputChannel = "exceptionEventChannel", outputChannel = "serviceChannel")
    public String handle(Message<MessagingException> msg) {
        //String unfilteredMessage = new String(byteMessage, StandardCharsets.US_ASCII);
        System.out.println("-----------------EXCEPTION ==> " + msg);
        return msg.toString();
    }

    @Transformer(inputChannel = "errorChannel", outputChannel = "serviceChannel")
    public String transformer(String msg) {
        //String unfilteredMessage = new String(byteMessage, StandardCharsets.US_ASCII);
        System.out.println("-----------------ERROR ==> " + msg);
        return msg.toString();
    }


    @ServiceActivator(inputChannel = "serviceChannel")
    @Bean
    public TcpSendingMessageHandler out(AbstractServerConnectionFactory cf) {
        TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
        tcpSendingMessageHandler.setConnectionFactory(cf);
        return tcpSendingMessageHandler;
    }


    @Bean
    public ApplicationListener<TcpDeserializationExceptionEvent> listener() {
        return new ApplicationListener<TcpDeserializationExceptionEvent>() {

            @Override
            public void onApplicationEvent(TcpDeserializationExceptionEvent tcpDeserializationExceptionEvent) {
                exceptionEventChannel().send(MessageBuilder.withPayload(tcpDeserializationExceptionEvent.getCause())
                        .build());
            }

        };
    }
}
* 100p * Сообщения в сообщениях вотправляется методу @ServiceActivator внутри отдельного класса @Component, который структурирован следующим образом:
@Component
public class TcpServiceActivator {

    @Autowired
    public TcpServiceActivator() {
    }


    @ServiceActivator(inputChannel = "tcpIn", outputChannel = "serviceChannel")
    public String service(byte[] byteMessage) {
     // Business Logic returns String Ack Response
    }

У меня нет проблем при запуске сценария успеха.Мой Tcp TestClient получает ответ Ack, как и ожидалось.

Однако, когда я пытаюсь смоделировать исключение, скажем, Exserializer Exception, сообщение об исключении не отправляется обратно как ответ Tcp Client.Я вижу, что мой приемник приложений получает TcpDeserializationExceptionEvent и отправляет сообщение в exceptionEventChannel.Дескриптор метода @ServiceActivator (Message msg) также печатает мое сообщение об исключении.Но он никогда не достигает точек останова (в режиме отладки) внутри метода MessageHandler out (AbstractServerConnectionFactory cf).

Я изо всех сил пытаюсь понять, что происходит не так.Заранее благодарен за любую помощь.


ОБНОВЛЕНИЕ: Я заметил, что Сокет закрыт из-за исключительной ситуации, прежде чем ответ может быть отправлен.Я пытаюсь найти способ обойти это

ОБНОВЛЕНИЕ РЕШЕНИЯ (12 марта 2019 г.):

С разрешения Гэри, я отредактировал свой десериализатор так, чтобы он возвращал сообщение, которое может быть отслежено @Метод маршрутизатора и перенаправлен на errorChannel.ServiceActivator, прослушивающий канал ошибок, затем отправляет желаемое сообщение об ошибке в outputChannel.Это решение, кажется, работает.

Мой метод десериализатора внутри ByteArrayLengthPrefixSerializer возвращает «специальное значение», как рекомендовано Гэри, вместо исходного сообщения inputStream.

    public byte[] doDeserialize(InputStream inputStream, byte[] buffer) throws IOException {
        boolean isValidMessage = false;
        try {
            int messageLength = this.readPrefix(inputStream);
            if (messageLength > 0 && fillUntilMaxDeterminedSize(inputStream, buffer, messageLength)) {
                return this.copyToSizedArray(buffer, messageLength);
            }
            return EventType.MSG_INVALID.getName().getBytes(); 
        } catch (SoftEndOfStreamException eose) {
            return EventType.MSG_INVALID.getName().getBytes();
        }
    }

Я также создал несколько новых каналов для размещения моего маршрутизатора, так чтопоток выглядит следующим образом:

поток успеха tcpIn (@Router) -> serviceChannel (@serviceActivator, который содержит бизнес-логику) -> outputChannel (@serviceActivator, который отправляет ответ клиенту) *

поток исключенийtcpIn (@Router) -> errorChannel (@serviceActivator, который готовит ответное сообщение об ошибке) -> outputChannel (@serviceActivator, который отправляет ответ клиенту)

My @Router и errorHandling @serviceActivator -

    @Router(inputChannel = "tcpIn", defaultOutputChannel = "errorChannel")
    public String messageRouter(byte[] byteMessage) {
        String unfilteredMessage = new String(byteMessage, StandardCharsets.US_ASCII);
        System.out.println("------------------> "+unfilteredMessage);
        if (Arrays.equals(EventType.MSG_INVALID.getName().getBytes(), byteMessage)) {
            return "errorChannel";
        }
        return "serviceChannel";
    }


    @ServiceActivator(inputChannel = "errorChannel", outputChannel = "outputChannel")
    public String errorHandler(byte[] byteMessage) {
        return Message.ACK_RETRY;
    }

1 Ответ

0 голосов
/ 11 марта 2019

Канал ошибок предназначен для обработки исключений, возникающих при обработке сообщения .Ошибки десериализации происходят до создания сообщения (десериализатор декодирует полезную нагрузку для сообщения).

Исключения десериализации являются фатальными и, как вы заметили, сокетзакрыто.

Один из вариантов - перехватить исключение в десериализаторе и вернуть «специальное» значение, которое указывает на возникшее исключение десериализации, а затем проверить это значение в основном потоке.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...