Я перенес устаревший код tcp-сервера в spring-boot и добавил зависимости для Spring-интеграции (на основе аннотаций) для обработки соединений через tcp-сокеты.
Мой входящий канал - tcpIn (), исходящий канал - serviceChannel (), и я создал собственный канал [exceptionEventChannel ()] для хранения сообщений о событиях исключения.
У меня есть собственный метод сериализатора / Deserialier (ByteArrayLengthPrefixSerializer () расширяет AbstractPooledBufferByteArraySerializer) и метод MessageHandler @ServiceActivator для отправки ответа обратно клиенту tcp.
//SpringBoot 2.0.3.RELEASE, Spring Integration 5.0.6.RELEASE
package com.test.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.event.inbound.ApplicationEventListeningMessageProducer;
import org.springframework.integration.ip.IpHeaders;
import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter;
import org.springframework.integration.ip.tcp.TcpSendingMessageHandler;
import org.springframework.integration.ip.tcp.connection.*;
import org.springframework.integration.ip.tcp.serializer.TcpDeserializationExceptionEvent;
import org.springframework.integration.router.ErrorMessageExceptionTypeRouter;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;
import java.io.IOException;
@Configuration
@IntegrationComponentScan
public class TcpConfiguration {
@SuppressWarnings("unused")
@Value("${tcp.connection.port}")
private int tcpPort;
@Bean
TcpConnectionEventListener customerTcpListener() {
return new TcpConnectionEventListener();
}
@Bean
public MessageChannel tcpIn() {
return new DirectChannel();
}
@Bean
public MessageChannel serviceChannel() {
return new DirectChannel();
}
@ConditionalOnMissingBean(name = "errorChannel")
@Bean
public MessageChannel errorChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel exceptionEventChannel() {
return new DirectChannel();
}
@Bean
public ByteArrayLengthPrefixSerializer byteArrayLengthPrefixSerializer() {
ByteArrayLengthPrefixSerializer byteArrayLengthPrefixSerializer = new ByteArrayLengthPrefixSerializer();
byteArrayLengthPrefixSerializer.setMaxMessageSize(98304); //max allowed size set to 96kb
return byteArrayLengthPrefixSerializer;
}
@Bean
public AbstractServerConnectionFactory tcpNetServerConnectionFactory() {
TcpNetServerConnectionFactory tcpServerCf = new TcpNetServerConnectionFactory(tcpPort);
tcpServerCf.setSerializer(byteArrayLengthPrefixSerializer());
tcpServerCf.setDeserializer(byteArrayLengthPrefixSerializer());
return tcpServerCf;
}
@Bean
public TcpReceivingChannelAdapter tcpReceivingChannelAdapter() {
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(tcpNetServerConnectionFactory());
adapter.setOutputChannel(tcpIn());
adapter.setErrorChannel(exceptionEventChannel());
return adapter;
}
@ServiceActivator(inputChannel = "exceptionEventChannel", outputChannel = "serviceChannel")
public String handle(Message<MessagingException> msg) {
//String unfilteredMessage = new String(byteMessage, StandardCharsets.US_ASCII);
System.out.println("-----------------EXCEPTION ==> " + msg);
return msg.toString();
}
@Transformer(inputChannel = "errorChannel", outputChannel = "serviceChannel")
public String transformer(String msg) {
//String unfilteredMessage = new String(byteMessage, StandardCharsets.US_ASCII);
System.out.println("-----------------ERROR ==> " + msg);
return msg.toString();
}
@ServiceActivator(inputChannel = "serviceChannel")
@Bean
public TcpSendingMessageHandler out(AbstractServerConnectionFactory cf) {
TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
tcpSendingMessageHandler.setConnectionFactory(cf);
return tcpSendingMessageHandler;
}
@Bean
public ApplicationListener<TcpDeserializationExceptionEvent> listener() {
return new ApplicationListener<TcpDeserializationExceptionEvent>() {
@Override
public void onApplicationEvent(TcpDeserializationExceptionEvent tcpDeserializationExceptionEvent) {
exceptionEventChannel().send(MessageBuilder.withPayload(tcpDeserializationExceptionEvent.getCause())
.build());
}
};
}
}
* 100p * Сообщения в сообщениях вотправляется методу @ServiceActivator внутри отдельного класса @Component, который структурирован следующим образом:
@Component
public class TcpServiceActivator {
@Autowired
public TcpServiceActivator() {
}
@ServiceActivator(inputChannel = "tcpIn", outputChannel = "serviceChannel")
public String service(byte[] byteMessage) {
// Business Logic returns String Ack Response
}
У меня нет проблем при запуске сценария успеха.Мой Tcp TestClient получает ответ Ack, как и ожидалось.
Однако, когда я пытаюсь смоделировать исключение, скажем, Exserializer Exception, сообщение об исключении не отправляется обратно как ответ Tcp Client.Я вижу, что мой приемник приложений получает TcpDeserializationExceptionEvent и отправляет сообщение в exceptionEventChannel.Дескриптор метода @ServiceActivator (Message msg) также печатает мое сообщение об исключении.Но он никогда не достигает точек останова (в режиме отладки) внутри метода MessageHandler out (AbstractServerConnectionFactory cf).
Я изо всех сил пытаюсь понять, что происходит не так.Заранее благодарен за любую помощь.
ОБНОВЛЕНИЕ: Я заметил, что Сокет закрыт из-за исключительной ситуации, прежде чем ответ может быть отправлен.Я пытаюсь найти способ обойти это
ОБНОВЛЕНИЕ РЕШЕНИЯ (12 марта 2019 г.):
С разрешения Гэри, я отредактировал свой десериализатор так, чтобы он возвращал сообщение, которое может быть отслежено @Метод маршрутизатора и перенаправлен на errorChannel.ServiceActivator, прослушивающий канал ошибок, затем отправляет желаемое сообщение об ошибке в outputChannel.Это решение, кажется, работает.
Мой метод десериализатора внутри ByteArrayLengthPrefixSerializer возвращает «специальное значение», как рекомендовано Гэри, вместо исходного сообщения inputStream.
public byte[] doDeserialize(InputStream inputStream, byte[] buffer) throws IOException {
boolean isValidMessage = false;
try {
int messageLength = this.readPrefix(inputStream);
if (messageLength > 0 && fillUntilMaxDeterminedSize(inputStream, buffer, messageLength)) {
return this.copyToSizedArray(buffer, messageLength);
}
return EventType.MSG_INVALID.getName().getBytes();
} catch (SoftEndOfStreamException eose) {
return EventType.MSG_INVALID.getName().getBytes();
}
}
Я также создал несколько новых каналов для размещения моего маршрутизатора, так чтопоток выглядит следующим образом:
поток успеха tcpIn (@Router) -> serviceChannel (@serviceActivator, который содержит бизнес-логику) -> outputChannel (@serviceActivator, который отправляет ответ клиенту) *
поток исключенийtcpIn (@Router) -> errorChannel (@serviceActivator, который готовит ответное сообщение об ошибке) -> outputChannel (@serviceActivator, который отправляет ответ клиенту)
My @Router и errorHandling @serviceActivator -
@Router(inputChannel = "tcpIn", defaultOutputChannel = "errorChannel")
public String messageRouter(byte[] byteMessage) {
String unfilteredMessage = new String(byteMessage, StandardCharsets.US_ASCII);
System.out.println("------------------> "+unfilteredMessage);
if (Arrays.equals(EventType.MSG_INVALID.getName().getBytes(), byteMessage)) {
return "errorChannel";
}
return "serviceChannel";
}
@ServiceActivator(inputChannel = "errorChannel", outputChannel = "outputChannel")
public String errorHandler(byte[] byteMessage) {
return Message.ACK_RETRY;
}