Я использую SpringIntegration IntegrationFlow для определения потока сообщений и использовал Jms.messageDrivenChannelAdapter для получения сообщения из MQ, теперь мне нужно проанализировать его, отправить его в KAFKA и обновить couchbase.
IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(this.acarsMqListener)) //MQ Listener with session transacted=true
.wireTap(ACARS_WIRE_TAP_CHNL) // Logging the message
.transform(agmTransformer, "parseXMLMessage") // Parse the xml message
.filter(acarsFilter,"filterMessageOnSmi") // Filter the message based on condition
.transform(agmTransformer, "populateImi") // Parse and Populate based on the message payload
.filter(acarsFilter,"filterMessageOnSmiImi") // Filter the message based on condition
.handle(acarsProcessor, "processEvent") // Create the message
.handle(Kafka.outboundChannelAdapter(kafkaTemplate).messageKey(MESSAGE_KEY).topic(acarsKafkaTopic)) //send it to kafka
.handle(updateCouchbase, "saveToDB") // Update couchbase
.get();
Согласно логике приложения - сообщение должно храниться в kafka и couchbase, если при сохранении сообщения в kafka и couchbase существует какое-либо исключение, сообщение следует откатить в очередь.Обрабатывает ли вышеуказанный поток сообщений ожидаемое поведение?Можете ли вы предложить, если какие-либо улучшения могут быть сделаны?