Поток Spring Integration вызывает службу REST - PullRequest
0 голосов
/ 14 марта 2019

У меня нижеуказанный поток интеграции, определенный в моем проекте

///

public IntegrationFlow acarsEventFlow() {
    return IntegrationFlows
            //.from(Jms.messageDrivenChannelAdapter(this.acarsMqListener)) //Get Message from MQ
            .from(org.springframework.integration.jms.dsl.Jms.messageDrivenChannelAdapter(
                    org.springframework.integration.jms.dsl.Jms.container(this.acarsMqConnectionFactory, this.acarsQueue)
                    .transactionManager(transactionManager(this.acarsMqConnectionFactory))
                    .get()))
            .wireTap(ACARS_WIRE_TAP_CHNL) 
            .transform(agmTransformer, "parseXMLMessage") //
            .handle(acarsProcessor, "pushRawMessage") // (1)Call web service to push the message payload and if it fails then don't commit the transaction and rollback the message
            .transform(agmTransformer, "populateSmi") 
            .filter(acarsFilter,"filterMessageOnSmi") // 
            .transform(agmTransformer, "populateImi") //
            .filter(acarsFilter,"filterMessageOnSmiImi") //
            .transform(acarsProcessor,"processEvent") //
            .publishSubscribeChannel(pubSub -> pubSub
                    .subscribe(flow -> flow
                        .bridge(e -> e.order(Ordered.HIGHEST_PRECEDENCE))
                        .enrichHeaders(h -> h.headerExpression(KafkaHeaders.MESSAGE_KEY, "payload.flightNbr")) //Add flight number as key
                        .transform("payload.message") // publish the transformed message
                        .handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic(acarsKafkaTopic))) //publish to kafka
                    .subscribe(flow -> flow
                        .channel(UPDATE_DATA_STORE_CHNL))) 
            .get(); 

}

///

Я получаю сообщение от MQ, запущенменеджер транзакций, обеспечивающий откат сообщения до тех пор, пока оно не будет обработано.Теперь в одном из методов дескриптора # pushRawMessage () [пожалуйста, обратитесь к комментарию (1). Вызовите веб-сервис, чтобы передать полезную нагрузку сообщения в приведенном выше фрагменте]. Мне нужно вызвать веб-сервис.В настоящее время я просто вызываю веб-сервис из обработчика - pushRawMessage ().Является ли хорошей идеей ввести Messaging Gateway для вызова стороннего веб-сервиса?если мы представим шлюз mEssaging, то как мы можем гарантировать, что исходное сообщение будет откатано, когда веб-сервис не работает?

1 Ответ

0 голосов
/ 14 марта 2019

Ничего страшного, чтобы это было так, как сейчас. Также хорошо использовать .gateway() для выполнения некоторого подпотока для этого процесса веб-службы. Пока все выполняется в одном потоке, когда вы используете только прямые каналы, все будет участвовать в одной транзакции. Поэтому любая ошибка в этом подпотоке вызовет откат транзакции.

Вы также можете использовать этот процесс веб-службы как async , если вы используете gateway(). Он все равно будет ждать ответа или ошибки в текущей теме. Итак, транзакция будет откатана снова.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...