Пожалуйста,
Не могли бы вы помочь с реализацией простого, эхо-стиля, службы сокетов Heartbeat TCP в Spring Integration DSL? Точнее, как подключить адаптер / обработчик / шлюз к IntegrationFlows
на стороне клиента и сервера. Трудно найти практические примеры для взаимодействия клиент-сервер Spring Integration DSL и TCP / IP.
Я думаю, я прибил большую часть кода, это просто то, что нужно соединить все вместе в IntegrationFlow
.
В примерах SI есть пример echo service , но он написан в «старой» конфигурации XML, и я действительно изо всех сил пытаюсь преобразовать его в конфигурацию с помощью кода.
Служба «Мой пульс» - это простой сервер, ожидающий, пока клиент запросит «статус», и ответит «ОК».
Нет @ServiceActivator
, нет @MessageGateways
, нет прокси, все явно и многословно; управляемый простым запланированным исполнителем JDK на стороне клиента; сервер и клиент в отдельных конфигах и проектах.
HeartbeatClientConfig
@Configuration
@EnableIntegration
public class HeartbeatClientConfig {
@Bean
public MessageChannel outboudChannel() {
return new DirectChannel();
}
@Bean
public PollableChannel inboundChannel() {
return new QueueChannel();
}
@Bean
public TcpNetClientConnectionFactory connectionFactory() {
TcpNetClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory("localhost", 7777);
return connectionFactory;
}
@Bean
public TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter(
TcpNetClientConnectionFactory connectionFactory,
MessageChannel inboundChannel) {
TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter = new TcpReceivingChannelAdapter();
heartbeatReceivingMessageAdapter.setConnectionFactory(connectionFactory);
heartbeatReceivingMessageAdapter.setOutputChannel(inboundChannel); // ???
heartbeatReceivingMessageAdapter.setClientMode(true);
return heartbeatReceivingMessageAdapter;
}
@Bean
public TcpSendingMessageHandler heartbeatSendingMessageHandler(
TcpNetClientConnectionFactory connectionFactory) {
TcpSendingMessageHandler heartbeatSendingMessageHandler = new TcpSendingMessageHandler();
heartbeatSendingMessageHandler.setConnectionFactory(connectionFactory);
return heartbeatSendingMessageHandler;
}
@Bean
public IntegrationFlow heartbeatClientFlow(
TcpNetClientConnectionFactory connectionFactory,
TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter,
TcpSendingMessageHandler heartbeatSendingMessageHandler,
MessageChannel outboudChannel) {
return IntegrationFlows
.from(outboudChannel) // ??????
.// adapter ???????????
.// gateway ???????????
.// handler ???????????
.get();
}
@Bean
public HeartbeatClient heartbeatClient(
MessageChannel outboudChannel,
PollableChannel inboundChannel) {
return new HeartbeatClient(outboudChannel, inboundChannel);
}
}
HeartbeatClient
public class HeartbeatClient {
private final MessageChannel outboudChannel;
private final PollableChannel inboundChannel;
private final Logger log = LogManager.getLogger(HeartbeatClient.class);
public HeartbeatClient(MessageChannel outboudChannel, PollableChannel inboundChannel) {
this.inboundChannel = inboundChannel;
this.outboudChannel = outboudChannel;
}
@EventListener
public void initializaAfterContextIsReady(ContextRefreshedEvent event) {
log.info("Starting Heartbeat client...");
start();
}
public void start() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
while (true) {
try {
log.info("Sending Heartbeat");
outboudChannel.send(new GenericMessage<String>("status"));
Message<?> message = inboundChannel.receive(1000);
if (message == null) {
log.error("Heartbeat timeouted");
} else {
String messageStr = new String((byte[]) message.getPayload());
if (messageStr.equals("OK")) {
log.info("Heartbeat OK response received");
} else {
log.error("Unexpected message content from server: " + messageStr);
}
}
} catch (Exception e) {
log.error(e);
}
}
}, 0, 10000, TimeUnit.SECONDS);
}
}
HeartbeatServerConfig
@Configuration
@EnableIntegration
public class HeartbeatServerConfig {
@Bean
public MessageChannel outboudChannel() {
return new DirectChannel();
}
@Bean
public PollableChannel inboundChannel() {
return new QueueChannel();
}
@Bean
public TcpNetServerConnectionFactory connectionFactory() {
TcpNetServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(7777);
return connectionFactory;
}
@Bean
public TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter(
TcpNetServerConnectionFactory connectionFactory,
MessageChannel outboudChannel) {
TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter = new TcpReceivingChannelAdapter();
heartbeatReceivingMessageAdapter.setConnectionFactory(connectionFactory);
heartbeatReceivingMessageAdapter.setOutputChannel(outboudChannel);
return heartbeatReceivingMessageAdapter;
}
@Bean
public TcpSendingMessageHandler heartbeatSendingMessageHandler(
TcpNetServerConnectionFactory connectionFactory) {
TcpSendingMessageHandler heartbeatSendingMessageHandler = new TcpSendingMessageHandler();
heartbeatSendingMessageHandler.setConnectionFactory(connectionFactory);
return heartbeatSendingMessageHandler;
}
@Bean
public IntegrationFlow heartbeatServerFlow(
TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter,
TcpSendingMessageHandler heartbeatSendingMessageHandler,
MessageChannel outboudChannel) {
return IntegrationFlows
.from(heartbeatReceivingMessageAdapter) // ???????????????
.handle(heartbeatSendingMessageHandler) // ???????????????
.get();
}
@Bean
public HeartbeatServer heartbeatServer(
PollableChannel inboundChannel,
MessageChannel outboudChannel) {
return new HeartbeatServer(inboundChannel, outboudChannel);
}
}
HeartbeatServer
public class HeartbeatServer {
private final PollableChannel inboundChannel;
private final MessageChannel outboudChannel;
private final Logger log = LogManager.getLogger(HeartbeatServer.class);
public HeartbeatServer(PollableChannel inboundChannel, MessageChannel outboudChannel) {
this.inboundChannel = inboundChannel;
this.outboudChannel = outboudChannel;
}
@EventListener
public void initializaAfterContextIsReady(ContextRefreshedEvent event) {
log.info("Starting Heartbeat");
start();
}
public void start() {
Executors.newSingleThreadExecutor().execute(() -> {
while (true) {
try {
Message<?> message = inboundChannel.receive(1000);
if (message == null) {
log.error("Heartbeat timeouted");
} else {
String messageStr = new String((byte[]) message.getPayload());
if (messageStr.equals("status")) {
log.info("Heartbeat received");
outboudChannel.send(new GenericMessage<>("OK"));
} else {
log.error("Unexpected message content from client: " + messageStr);
}
}
} catch (Exception e) {
log.error(e);
}
}
});
}
}
Бонусный вопрос
Почему канал может быть установлен в TcpReceivingChannelAdapter (входящий адаптер), но не в TcpSendingMessageHandler (исходящий адаптер)?
UPDATE
Вот полный исходный код проекта, если кто-то заинтересован, чтобы кто-нибудь его клонировал:
https://bitbucket.org/espinosa/spring-integration-tcp-demo
Я постараюсь поместить все предложенные решения там.