Адаптер исходящего канала kafka Spring Integration - PullRequest
0 голосов
/ 20 февраля 2019

У меня есть объект модели, который заполняется после нескольких преобразований и разбора.Теперь мне нужно отправить атрибут сообщения внутри модели в kafka с помощью весенней интеграции.Я могу создать ключ, используя метод messageKey, но как я могу получить реальное сообщение из модели, такой как m.getPayload (). GetMessage (), и отправить его kafka.

                .publishSubscribeChannel(pubSub -> pubSub
                        .subscribe(flow -> flow
                            .bridge(e -> e.order(Ordered.HIGHEST_PRECEDENCE))
                            .handle(Kafka.outboundChannelAdapter(kafkaTemplate).
                                    messageKey(m -> ((AcarsFlightInformation) m.getPayload()).getFlightNbr()).topic(acarsKafkaTopic)))

1 Ответ

0 голосов
/ 20 февраля 2019

Не совсем понятно, о чем вы спрашиваете.Полезная нагрузка сообщения, отправляемого адаптеру, становится значением записи производителя.

Я думаю, что вы спрашиваете, что вы хотите отправить только часть полезной нагрузки.

Использовать заголовокобогащение и преобразование перед .handle ...

.enrichHeaders(h -> h.headerExpression(KafkaHeaders.MESSAGE_KEY, "payload.flightNumber")
.transform("payload.message")
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
      .topic(acarsKafkaTopic))
.get();

Адаптер будет искать этот заголовок для ключа.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...