Планирование одной группы процессов nifi для нескольких таблиц в базе данных - PullRequest
0 голосов
/ 25 мая 2018

Я создал группу процессов со следующими требованиями в Nifi:

Извлечение данных из таблицы кустов >> Шифрование содержимого >> загрузка в хранилище BLOB-объектов Azure.

Теперь у меня есть 3000 таблиц длякоторый вышеупомянутый поток должен быть запланирован.Есть ли способ использовать только один поток для всех таблиц вместо создания 3000 потоков для каждой таблицы.

введите описание изображения здесь

Также я хочу выполнитьЛазурное хранилище для некоторых таблиц не для всех.Есть ли способ дать инструкции в потоке на основе любого условия, что Таблица 1 должна идти только в gcloud, а не в Azure.Точно так же я хочу, чтобы Таблица 2 работала как с лазурью, так и с gcloud.

Заранее спасибо

1 Ответ

0 голосов
/ 25 мая 2018

Есть пара вещей, которые можно сделать, но я надеюсь, что некоторые оптимальные подходы все еще существуют.

Нет необходимости в 3000 потоков

SelectHiveQLможет взять входной файл потока, который может содержать запрос SELECT, поэтому мне приходит в голову один подход: вы можете иметь 3000 запросов SELECT в файле и использовать GetFile, чтобы прочитать этот файл, и использовать SplitText, чтобы разделить его назатем подключите отношение splits к процессору SelectHiveQL и примените логику дальнейшей обработки, например EncryptContent, и, при необходимости, запишите в WASB.

Маршрутизация на основе принятия решений

Для вашего второго запроса, который решает, какие файлы следует поместить в BLOB-объект Azure, а какие - в GCS Bucket, вы можете использовать процессор RouteOnAttribute.

SelectHiveQL записывает атрибут query.input.tablesкоторый содержит список таблиц, выбранных в формате через запятую.Если вы запросили только одну таблицу, одно только имя таблицы будет записано для query.input.tables.Таким образом, в RouteOnAttribute вы можете иметь свойство динамической связи, например:

  • Destination.Azure : ${query.input.tables:in("Table_1", "Table_15")}
  • Destination.GCS : ${query.input.tables:in("Table_2", "Table_5")}

Затем подключите Destination.Azure кПроцессор PutAzureBlobStorage и Destination.GCS до PutGCSObject.

В приведенном выше примере я использовал оператор in, который предлагает язык выражений NiFi, вы можете использовать другие операторы на основе соглашения об именовании таблиц.Например, если вы хотите, чтобы таблицы клиентов помещались в таблицы BLOB-объектов Azure и таблицы поставщиков в GCS, вы можете использовать это:

  • Destination.Azure : ${query.input.tables:startsWith("customer")}
  • Destination.GCS : ${query.input.tables:startsWith("supplier")}

Полный список операторов и функций, предлагаемых языком выражений NifFi, можно найти в этом документе .

.
...