Есть пара вещей, которые можно сделать, но я надеюсь, что некоторые оптимальные подходы все еще существуют.
Нет необходимости в 3000 потоков
SelectHiveQL
может взять входной файл потока, который может содержать запрос SELECT
, поэтому мне приходит в голову один подход: вы можете иметь 3000 запросов SELECT в файле и использовать GetFile
, чтобы прочитать этот файл, и использовать SplitText
, чтобы разделить его назатем подключите отношение splits
к процессору SelectHiveQL
и примените логику дальнейшей обработки, например EncryptContent
, и, при необходимости, запишите в WASB.
Маршрутизация на основе принятия решений
Для вашего второго запроса, который решает, какие файлы следует поместить в BLOB-объект Azure, а какие - в GCS Bucket, вы можете использовать процессор RouteOnAttribute
.
SelectHiveQL
записывает атрибут query.input.tables
который содержит список таблиц, выбранных в формате через запятую.Если вы запросили только одну таблицу, одно только имя таблицы будет записано для query.input.tables
.Таким образом, в RouteOnAttribute
вы можете иметь свойство динамической связи, например:
Destination.Azure : ${query.input.tables:in("Table_1", "Table_15")}
Destination.GCS : ${query.input.tables:in("Table_2", "Table_5")}
Затем подключите Destination.Azure
кПроцессор PutAzureBlobStorage
и Destination.GCS
до PutGCSObject
.
В приведенном выше примере я использовал оператор in
, который предлагает язык выражений NiFi, вы можете использовать другие операторы на основе соглашения об именовании таблиц.Например, если вы хотите, чтобы таблицы клиентов помещались в таблицы BLOB-объектов Azure и таблицы поставщиков в GCS, вы можете использовать это:
Destination.Azure : ${query.input.tables:startsWith("customer")}
Destination.GCS : ${query.input.tables:startsWith("supplier")}
Полный список операторов и функций, предлагаемых языком выражений NifFi, можно найти в этом документе .
.