Если сообщение из потока Kafka будет получено в стандартном формате, это будет легко сделать.Ниже приведены действия, которые необходимо выполнить, если получающее сообщение находится в формате JSON.
- Настройка
ConsumeKafka
для получения из темы - Использование
EvaluateJsonPath
для анализа и чтения значениядля определенного ключа и назначьте его атрибуту NiFi FlowFile - Используйте процессор
ReplaceText
для формирования собственного сообщения (содержащего проанализированный ключ), которое вы хотите отправить в другую тему Kafka - Соедините поток с
PutKafka
Более подробную информацию о EvaluateJsonPath можно найти здесь и здесь