Как я могу отфильтровать файлы потока по результату запроса SQL? - PullRequest
0 голосов
/ 25 сентября 2018

Можно ли направить файлы потока в соответствии с результатом запроса SQL, который возвращает результат в одну строку?Например, если результат равен «1», файл потока будет обработан;в противном случае он будет проигнорирован.

Решение

Для меня лучше всего подойдет следующий подход.

  1. Для запуска используйте процессор ExecuteSQL фильтрация SQL-запросов.Запрос был написан для получения либо одной записи (совпадения), либо пустого набора записей (без совпадения) способом, предложенным Шу.
  2. Connect ExecuteSQL к RouteOnAttribute процессор, чтобы отфильтровать несопоставленные файлы потока, используя следующее значение значения свойства маршрутизации ${executesql.row.count:replaceNull(0):gt(0)}

Обратите внимание , что исходное содержимое файла потока будет потеряно послеприменение ExecuteSQL .В моем случае это не проблема, потому что я выполняю фильтрацию перед обработкой содержимого потокового файла, и мой SQL-запрос полностью основан на атрибутах потокового файла, а не на его содержимом.Хотя в более общем сценарии, когда содержимое файла потока изменяется входящей частью потока, необходимо сохранить содержимое файла где-нибудь (например, файловую систему) и восстановить его после применения фильтрующей части.

1 Ответ

0 голосов
/ 25 сентября 2018

Вы можете добавить условие where в свой sql-запрос where <field_name> = 1, тогда мы получим flowfile только тогда, когда значение результата = 1 .

(или)

Проверка данных в NiFi:

У нас будет AVRO форматировать данные в результате запроса SQL, чтобы вы могли использовать

option1: ConvertAvroToJson Processor:

Преобразовать AVRO данные в формате JSON затем извлеките значение из содержимого json в качестве атрибута, используя процессор EvaluateJsonPath.

Затем используйте RouteOnAttribute процессор добавляет новое свойство, используя Язык выражения выражения NiFi равен функции сравнивает значение и направляет потоковый файл к сопоставленному отношению.

См. эту ссылку для получения более подробной информации относительно EvaluateJsonpathи конфигурации процессора RouteOnAttribute.

option2: Использование QueryRecord process:

Используя процессор QueryRecord , мы можем запустить SQL-запросов для содержимого потокового файла

Добавьте новое свойство к процессору как

select * from FLOWFILE where <filed_name> =1 

Передайте отношение свойства к другому процессору

См. эту ссылку для получения дополнительной информации об использовании процессора QueryRecord.

...