Присвоение имен разделенным файлам в nifi для конкретной таблицы, а затем сброс для другой таблицы - PullRequest
0 голосов
/ 06 июня 2018

Я выполняю следующие действия в nifi: извлечение данных из таблиц в кусте и последующая маршрутизация потоковых файлов в зависимости от размера: если размер потокового файла равен gt 2 ГБ, то разделить потоковый файл на несколько потоковых файлов по 2 ГБ каждый.Я хочу использовать атрибут update для именования этих разделений, таких как TableName_001_001, Tablename_001_002, Tablename_001_003 для определенного файла потока или таблицы.

Когда следующий файл потока попадает в разделение, его также следует называть как выше.

Есть ли какой-нибудь способ, которым мы можем сделать с существующим процессором?

Ответы [ 2 ]

0 голосов
/ 07 июня 2018

Направьте файл данных куста в зависимости от размера для дальнейшего разделения на некоторый порог, например 1 ГБ (процессор RouteOnAttribute)

Добавление разделения свойств: $ {fileSize: gt (размер в байтах)}

введите описание изображения здесь

Назовите файлы на основе атрибута фрагмента.index, добавленного после процессора splitText

Перейдите в расширенный раздел обработчика UpdateAttribute иДобавить правила

Добавить правила и действия, основанные на вашем случае использования.Я должен обновить имя файла, чтобы я использовал имя файла атрибута и добавил $ {фрагмент.index} к суффиксу имени файла.Если индекс меньше 10, тогда мне нужно 00 {фрагмент.index}, для двузначного значения индекса 0 {фрагмент.index}, для отдыха я хотел только {фрагмент.index}.

Добавлениеправила для обновления процессора атрибутов с помощью языка выражений

имя файла: $ {query.input.tables} _001_00 $ {фрагмент.index} .txt

query.input.tables даст имястола, извлеченного из улья.и добавляется суффикс 00 $ {фрагмент.index}

Таким образом, мы можем использовать мощь языка выражений в Apache Nifi

0 голосов
/ 06 июня 2018

Если вы используете любые другие процессоры Кроме процессора SplitRecord для разбиения потокового файла на более мелкие куски, то каждый потоковый файл будет иметь атрибут фрагмент.index , связанный с потоковым файлом.

Поскольку у вас есть имя таблицы в качестве атрибута для файла потока и используйте эти атрибуты (имя_таблицы и фрагмент.index) и объедините их в единицу для Создайте новый обязательный атрибут enter image description here

Я предполагаю, что tab_name является атрибутом имени таблицы, а также Добавить новое свойство в обработчик атрибутов обновления и

Кроме того, если вы хотите сохранить эти атрибуты уникальнымизатем вы можете добавить значение метки времени в конце, как

new_attribute

${tab_name}_${fragment.index}_${now():toNumber()}
...