SpringIntegration публикует на kafka и обновляет couchbase после получения сообщения от MQ - PullRequest
0 голосов
/ 28 января 2019

Я использую SpringIntegration IntegrationFlow для определения потока сообщений и использовал Jms.messageDrivenChannelAdapter для получения сообщения из MQ, теперь мне нужно проанализировать его, отправить его в KAFKA и обновить couchbase.

IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(this.acarsMqListener))  //MQ Listener with session transacted=true
                .wireTap(ACARS_WIRE_TAP_CHNL)                                 // Logging the message
                .transform(agmTransformer, "parseXMLMessage")                 // Parse the xml message
                .filter(acarsFilter,"filterMessageOnSmi")                     // Filter the message based on condition
                .transform(agmTransformer, "populateImi")                     // Parse and Populate based on the message payload
                .filter(acarsFilter,"filterMessageOnSmiImi")                  // Filter the message based on condition
                .handle(acarsProcessor, "processEvent")                       // Create the message
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate).messageKey(MESSAGE_KEY).topic(acarsKafkaTopic))  //send it to kafka
                .handle(updateCouchbase, "saveToDB")                          // Update couchbase
                .get();

Согласно логике приложения - сообщение должно храниться в kafka и couchbase, если при сохранении сообщения в kafka и couchbase существует какое-либо исключение, сообщение следует откатить в очередь.Обрабатывает ли вышеуказанный поток сообщений ожидаемое поведение?Можете ли вы предложить, если какие-либо улучшения могут быть сделаны?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...