Я использую SpringIntegration IntegrationFlow для определения потока сообщений и использовал Jms.messageDrivenChannelAdapter для получения сообщения из MQ, теперь мне нужно проанализировать его, отправить его в KAFKA и обновить couchbase.
IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(this.acarsMqListener)) //MQ Listener with session transacted=true
.wireTap(ACARS_WIRE_TAP_CHNL) // Logging the message
.transform(agmTransformer, "parseXMLMessage") .filter(acarsFilter,"filterMessageOnSmiImi") // Filter the message based on condition
.handle(acarsProcessor, "processEvent") // Create the message
.handle(Kafka.outboundChannelAdapter(kafkaTemplate).messageKey(MESSAGE_KEY).topic(acarsKafkaTopic)) //send it to kafka
.handle(updateCouchbase, "saveToDB") // Update couchbase
.get();
Для каждого полученного сообщения мы хотим регистрировать его, используя MDC, чтобы помочь нам собрать / агрегировать его на основе UUID.Пожалуйста, предложите, как поместить UUID в MDC, а затем очистить MDC для каждого сообщения в вышеупомянутом потоке