Интеграция Spring - регистрируйте каждое сообщение с помощью mdc, чтобы иметь возможность агрегировать сообщения - PullRequest
0 голосов
/ 04 февраля 2019

Я использую SpringIntegration IntegrationFlow для определения потока сообщений и использовал Jms.messageDrivenChannelAdapter для получения сообщения из MQ, теперь мне нужно проанализировать его, отправить его в KAFKA и обновить couchbase.

IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(this.acarsMqListener))  //MQ Listener with session transacted=true
                .wireTap(ACARS_WIRE_TAP_CHNL)                                 // Logging the message
                .transform(agmTransformer, "parseXMLMessage")                                 .filter(acarsFilter,"filterMessageOnSmiImi")                  // Filter the message based on condition
                .handle(acarsProcessor, "processEvent")                       // Create the message
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate).messageKey(MESSAGE_KEY).topic(acarsKafkaTopic))  //send it to kafka
                .handle(updateCouchbase, "saveToDB")                          // Update couchbase
                .get();

Для каждого полученного сообщения мы хотим регистрировать его, используя MDC, чтобы помочь нам собрать / агрегировать его на основе UUID.Пожалуйста, предложите, как поместить UUID в MDC, а затем очистить MDC для каждого сообщения в вышеупомянутом потоке

1 Ответ

0 голосов
/ 04 февраля 2019

Вы можете просто настроить global WireTap и выполнить соответствующее преобразование в этом потоке с прослушиванием, прежде чем регистрировать сообщение.

С другой стороны, может быть простонеобходимо поиграть с MDC, так как вы можете вставить в сообщение что-то вроде заголовка For и, как обычно, log(), чтобы вы видели сообщения в журналах и могли коррелировать с помощью этого заголовка.

...