NiFi: написание кода для процессора ExecuteScript для объединения нескольких записей - PullRequest
0 голосов
/ 09 октября 2018

У меня есть поток NiFi (который работает), который разбивает огромную электронную таблицу на отдельные csv по названию компании.

Например,

GetFile -> SplitText -> PartitionRecord -> MergeContent ->
UpdateAttribute -> PutFile

Это приводит, например,

enter image description here

Проблема связана с csv похоже на то, где одна и та же компания вводится несколько иначе:

enter image description here

Я знаю, что мне нужно вставить сюда процессор ExecuteScript,Мне нужно объединить все такие дубликаты в один файл без необходимости прочесывать несколько тысяч строк, чтобы выбрать каждую компанию, которая была введена несколькими способами.

Я думаю, что это легко сделать с помощью Groovy:

flowFile = session.get()
if(!flowFile) return
myAttr = flowFile.getAttribute('filename')

Не уверен, куда идти.

1 Ответ

0 голосов
/ 10 октября 2018

Вы можете объединить все дубликаты с помощью UpdateAttribute процессор Adavanced Использование и затем сохранить файлы вкаталоги .. !!

Поток:

GetFile -> SplitText -> PartitionRecord -> UpdateAttribute-> MergeContent
 -> PutFile

UpdateAttribute Configs:

enter image description here

  1. После процессора PartitionRecord у вас будет поле раздела в качестве атрибута для файла потока, на основе этого значения атрибута вы можете написать Правила, например

    • Добавление нового правила с условиями

    ${partition_field_name:toLower():contains("campbell")}

    В вышеприведенном правиле мы проверяем, содержит ли значение атрибута partition_field_name Кэмпбелл, как этот способ, используя Язык выражений NiFI и добавьте свою логику для определения всех видов значений разделов и выполнения желаемых действий.

  2. Если да, то мы добавляем атрибут flowfile, т.е.

    имя файла со значением campbell.csv

  3. Тогда в вашемПроцессор слияния содержимого сконфигурируйте следующее свойство как Имя атрибута корреляции -> имя файла Теперь все файлы одного типа собираются вместе.

  4. Если вы придерживаетесь этого подхода, вам нужно изменить PutFile configs Стратегия разрешения конфликтов , потому что у нас снова будет такое же имя файла.

(или)

  1. Было бы лучше, если бы вы создали атрибут каталога в первом атрибуте UpdateAttribute на основе аргумента partition_value.

  2. Затем используйте процессор MergeContentс именем атрибута корреляции как directory , Теперь мы объединили все файлы, принадлежащие одному и тому же каталогу.

  3. Затем используйте другой процессор UpdateAttribute дляизмените имя файла на уникальное значение, например UUID (or) timestamp..etc.

  4. Сохранение данных с использованием процессора PutFile динамически на основе атрибут каталога значение.

Расход:

  GetFile 
  -> SplitText 
  -> PartitionRecord 
  -> UpdateAttribute //add directory attribute 
  -> MergeContent 
  -> UpdateAttribute //change filename to ${UUID()}
  -> PutFile
...