Если вы используете процессоры NiFi Kafka (ConsumeKafka / ConsumeKafkaRecord) для получения сообщений от Kafka, они будут выводить сообщения в виде FlowFiles
.Они содержат один атрибут с именем kafka.topic
, который будет содержать название темы, из которой пришло сообщение.
Для маршрутизации сообщений на основе имени темы вы можете использовать процессор RouteOnAttribute
.Например, у вас есть две темы topicA
& topicB
.Затем необходимо настроить процессор RouteOnAttribute следующим образом:
![enter image description here](https://i.stack.imgur.com/C9Zyv.jpg)
![enter image description here](https://i.stack.imgur.com/5NWYN.jpg)
Затем подключитеОтношение topic-a
& topic-b
к отдельным потокам на основе ваших требований.Если вы добавляете больше источников Kafka, все, что вам нужно сделать, это обновить RouteOnAttribute
еще одним динамическим отношением.Пример: topic-c : ${kafka.topic:equals('topicC')}