NiFi: Фильтровать тему при потреблении от Kafka - PullRequest
1 голос
/ 03 мая 2019

В kafka есть пакетная тема (Json content), которую нужно использовать через NiFi(version 1.8).Я могу использовать эту тему, используя consumekafkarecord процессор, но хотел бы фильтровать по значению атрибута, поскольку мне не нужны все записи из этой темы.

Можно ли выполнить фильтр при использованииКафка тема even before getting the records into NiFi?Каков наилучший подход для этого, например, использование Processors или Scripts?

Я просто хочу отфильтровать огромное количество записей на основе одного из значений атрибута, поскольку они не нужны.

1 Ответ

1 голос
/ 03 мая 2019

Я не знаю способа фильтрации записей внутри ConsumeKafkaRecord, но вы можете легко сделать это сразу после этого процессора.

Один из вариантов - подключить его к процессору QueryRecord и написать оператор SQL, который выбирает интересующие вас записи.

Второй вариант - использовать PartitionRecord, который позволяет разделять записи на основе выражения пути записи. Таким образом, вы можете сказать разделение на field1, и если есть два значения, таких как A и B, то он создаст два потоковых файла, один, содержащий все записи с A, и один, содержащий все записи с B, тогда вы просто направите то, что вас интересует и отправьте другого в тупик.

...