Я использую
- spring -gration-java-dsl-1.2.3.RELEASE
- spring -gration-ip-4.3.17.RELEASE
- spring -gration-http-4.3.17.RELEASE
Данный код предназначен для динамической генерации TCP-соединений.Я определяю enricherHeader с уникальным заголовком, называемым «connectionId».Этот «connectionId» будет именем созданного соединения.
@Service
@EnableIntegration
public class TcpConnectionsHolder {
private static final Logger LOGGER = LoggerFactory.getLogger(TcpConnectionsHolder.class);
@Autowired
private IntegrationFlowContext flowContext;
// @Autowired
// private BridgeMessageEndpoint bme;
@Value("${socket.tcp.headBytes}")
private int headBytes;
@Value("${socket.tcp.retryInterval}")
private int retryInterval;
//@Qualifier("sendToApi")
@Autowired
private MessageChannel fromTcp;
private final static String CONNECTION_SUFFIX="_connection";
public IntegrationFlow createTcpConnection(String connectionId, String host, int port, int headBytes,
int retryInterval)
{
LOGGER.debug("createTcpConnection -> connectionId: {} - host: {} - port: {} - headBytes: {} - retryInterval: {}"
,connectionId,host,port,headBytes,retryInterval);
IntegrationFlow ifr = existsConnection(connectionId);
if (ifr == null) {
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
final ByteArrayLengthHeaderSerializer by = new ByteArrayLengthHeaderSerializer(headBytes);
cf.setSingleUse(false);
cf.setSoKeepAlive(true);
cf.setSerializer(by);
cf.setDeserializer(by);
cf.setComponentName(connectionId);
//Inbound Adapter
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(cf);
adapter.setClientMode(true);
adapter.setErrorChannelName("errorChannel");
adapter.setRetryInterval(retryInterval);
adapter.setOutputChannel(fromTcp);
//OutBound Adapter
TcpSendingMessageHandler sender = new TcpSendingMessageHandler();
sender.setConnectionFactory(cf);
ifr = IntegrationFlows
.from(adapter)
.enrichHeaders(h -> h.header("connec_id",connectionId))
.handle(sender).get();
this.flowContext.registration(ifr).id(connectionId+CONNECTION_SUFFIX).addBean(cf).register();
LOGGER.debug("createTcpConnection: Connection created");
}
return ifr;
}
Также у меня есть класс MessageEndpoint, в котором я определяю все методы serviceActivator
@Configuration
@MessageEndpoint
@EnableIntegration
public class BridgeMessageEndpoint{
private static final Logger LOGGER = LoggerFactory.getLogger(BridgeMessageEndpoint.class);
@Autowired
private ApplicationContext applicationContext;
@ServiceActivator(inputChannel = "fromTcp")
public void outbound(final String inMessage, final @Headers Map<String, Object> headerMap) {
sendToApi(inMessage, headerMap);
}
private void sendToApi(final String inMessage, final Map<String, Object> headerMap) {
LOGGER.debug("Recuperando el mensaje Hex {}", inMessage);
final PaymentOrder paymentOrder = new PaymentOrder();
paymentOrder.setMessage(inMessage);
final SplitterRestClient splitterRestClient = applicationContext.getBean(SplitterRestClient.class);
splitterRestClient.reportPayment(paymentOrder, headerMap);
}
@Bean
@ServiceActivator(inputChannel = "errorChannel")
public MessageHandler logger() {
final LoggingHandler loggingHandler = new LoggingHandler(LoggingHandler.Level.DEBUG.name());
loggingHandler.setLoggerName("Log");
return loggingHandler;
}
/*
* Calls TcpRouter to redirect the message to the correct TCP connection
*/
@Bean
public IntegrationFlow toTcp() {
return f -> f.route(new TcpRouter());
}
@Bean
public MessageChannel fromTcp() {
final DirectChannel channel = new DirectChannel();
channel.addInterceptor(new ChannelInterceptorAdapter() {
@Override
public Message<?> preSend(final Message<?> message, final MessageChannel channel) {
// Parse Message byte[] to StringHex
final byte[] bMessagePayload = (byte[]) message.getPayload();
return MessageBuilder.withPayload(Hex.encodeHexString(bMessagePayload))
.copyHeaders(message.getHeaders()).build();
}
});
return channel;
}
Сказав это.У меня есть несколько вопросов, что мне нужно ваша помощь.
В методе creationTcpconnection я пытаюсь создать динамические соединения, определяя адаптер получателя и SenderAdapter.
Если я закомментирую строку (adapter.setOutputChannel (fromTcp);), как я рекомендовалмне, это не работает мне выполнение serviceActivator (inputChannel = "fromTcp"), определенного в классе BridgeMessageEndpoint.
Как я уже говорил, мне нужно отправить новый заголовок с именем "connectionId", который определяет, какойСоединение получило сообщение от tcp-сервера для последующего ответа на то же соединение