Spring Enrich header при создании динамического соединения - PullRequest
0 голосов
/ 27 декабря 2018

Я использую

  • spring -gration-java-dsl-1.2.3.RELEASE
  • spring -gration-ip-4.3.17.RELEASE
  • spring -gration-http-4.3.17.RELEASE

Данный код предназначен для динамической генерации TCP-соединений.Я определяю enricherHeader с уникальным заголовком, называемым «connectionId».Этот «connectionId» будет именем созданного соединения.

@Service
@EnableIntegration
public class TcpConnectionsHolder {

private static final Logger LOGGER = LoggerFactory.getLogger(TcpConnectionsHolder.class);

@Autowired
private IntegrationFlowContext flowContext;

//    @Autowired
//    private BridgeMessageEndpoint bme;

@Value("${socket.tcp.headBytes}")
private int headBytes;

@Value("${socket.tcp.retryInterval}")
private int retryInterval;

//@Qualifier("sendToApi")
@Autowired
private MessageChannel fromTcp;

private final static String CONNECTION_SUFFIX="_connection";    

public IntegrationFlow createTcpConnection(String connectionId, String host, int port, int headBytes,
        int retryInterval) 
{

    LOGGER.debug("createTcpConnection ->  connectionId: {} - host: {} - port: {} - headBytes: {} - retryInterval: {}"
            ,connectionId,host,port,headBytes,retryInterval);
    IntegrationFlow ifr = existsConnection(connectionId);
    if (ifr == null) {
        TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
        final ByteArrayLengthHeaderSerializer by = new ByteArrayLengthHeaderSerializer(headBytes);
        cf.setSingleUse(false);
        cf.setSoKeepAlive(true);
        cf.setSerializer(by);
        cf.setDeserializer(by);
        cf.setComponentName(connectionId);

        //Inbound Adapter 
        TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
        adapter.setConnectionFactory(cf);
        adapter.setClientMode(true);
        adapter.setErrorChannelName("errorChannel");
        adapter.setRetryInterval(retryInterval);
        adapter.setOutputChannel(fromTcp);

        //OutBound Adapter
        TcpSendingMessageHandler sender = new TcpSendingMessageHandler();
        sender.setConnectionFactory(cf);

        ifr = IntegrationFlows
                .from(adapter)
                .enrichHeaders(h -> h.header("connec_id",connectionId))
                .handle(sender).get();

        this.flowContext.registration(ifr).id(connectionId+CONNECTION_SUFFIX).addBean(cf).register();
        LOGGER.debug("createTcpConnection: Connection created");
    }
    return ifr;
}

Также у меня есть класс MessageEndpoint, в котором я определяю все методы serviceActivator

@Configuration
@MessageEndpoint
@EnableIntegration
public class BridgeMessageEndpoint{

private static final Logger LOGGER = LoggerFactory.getLogger(BridgeMessageEndpoint.class);

@Autowired
private ApplicationContext applicationContext;

@ServiceActivator(inputChannel = "fromTcp")
public void outbound(final String inMessage, final @Headers Map<String, Object> headerMap) {
    sendToApi(inMessage, headerMap);
}

private void sendToApi(final String inMessage, final Map<String, Object> headerMap) {
    LOGGER.debug("Recuperando el mensaje Hex {}", inMessage);
    final PaymentOrder paymentOrder = new PaymentOrder();
    paymentOrder.setMessage(inMessage);
    final SplitterRestClient splitterRestClient = applicationContext.getBean(SplitterRestClient.class);
    splitterRestClient.reportPayment(paymentOrder, headerMap);
}

@Bean
@ServiceActivator(inputChannel = "errorChannel")
public MessageHandler logger() {
    final LoggingHandler loggingHandler = new LoggingHandler(LoggingHandler.Level.DEBUG.name());
    loggingHandler.setLoggerName("Log");
    return loggingHandler;
}
/*
 * Calls TcpRouter to redirect the message to the correct TCP connection
 */
@Bean
public IntegrationFlow toTcp() {
    return f -> f.route(new TcpRouter());
}


@Bean
public MessageChannel fromTcp() {
    final DirectChannel channel = new DirectChannel();
    channel.addInterceptor(new ChannelInterceptorAdapter() {
        @Override
        public Message<?> preSend(final Message<?> message, final MessageChannel channel) {
            // Parse Message byte[] to StringHex
            final byte[] bMessagePayload = (byte[]) message.getPayload();
            return MessageBuilder.withPayload(Hex.encodeHexString(bMessagePayload))
                    .copyHeaders(message.getHeaders()).build();
        }
    });
    return channel;
}

Сказав это.У меня есть несколько вопросов, что мне нужно ваша помощь.

В методе creationTcpconnection я пытаюсь создать динамические соединения, определяя адаптер получателя и SenderAdapter.

Если я закомментирую строку (adapter.setOutputChannel (fromTcp);), как я рекомендовалмне, это не работает мне выполнение serviceActivator (inputChannel = "fromTcp"), определенного в классе BridgeMessageEndpoint.

Как я уже говорил, мне нужно отправить новый заголовок с именем "connectionId", который определяет, какойСоединение получило сообщение от tcp-сервера для последующего ответа на то же соединение

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...