NiFi может генерировать операторы создания таблиц на основе содержимого потокового файла.
1.Создание таблиц ORC с помощью процессора ConvertAvroToORC:
если вы конвертируете данные avro в формат ORC, а затем сохраняете в HDFS, то процессор ConvertAvroToORC добавляет атрибут hive.ddl
в файл потока.
Процессор PutHDFS добавляет absolute.hdfs.path
атрибут для файла потока.
Мы можем использовать эти атрибуты hive.ddl , absolute.hdfs.path и создать таблицу орков вначало каталога HDFS динамически.
Поток:
Pull data from source(ExecuteSQL...etc)
-> ConvertAvroToORC //add Hive DbName,TableName in HiveTableName property value-->
-> PutHDFS //store the orc file into HDFS location -->
-> ReplaceText //Replace the flowfile content with ${hive.ddl} Location '${absolute.hdfs.path}'-->
-> PutHiveQL //execute the create table statement
См. эту ссылку для получения более подробной информации.приведенный выше поток.
2. Создание таблиц Avro с использованием процессора ExtractAvroMetaData:
В NiFi, как только мы извлекаем данныес помощью QueryDatabaseTable, процессоров ExecuteSQL формат данных в AVRO .
Мы можем создать таблицы Avro на основе схемы avro (файл .avsc) и с помощью процессора ExtractAvroMetaData мы можем извлечь схему и сохранить ее в качестве атрибута flowfileзатем, используя эту схему, мы можем динамически создавать таблицы AvroTable.
Flow:
ExecuteSQL (success)|-> PutHDFS //store data into HDFS
(success)|-> ExtractAvroMetadata //configure Metadata Keys as avro.schema
-> ReplaceText //replace flowfile content with avro.schema
-> PutHDFS //store the avsc file into schema directory
-> ReplaceText //create avro table on top of schema directory
-> PutHiveQL //execute the hive.ddl
Пример AVRO Создайте оператор таблицы:
CREATE TABLE as_avro
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED as INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
TBLPROPERTIES (
'avro.schema.url'='/path/to/the/schema/test_serializer.avsc');
Мы собираемся изменить путь к URL схемы с помощью процессора ReplaceText в указанном выше потоке.
Другой способс помощью процессора ExecuteSQL получить все инструкции для создания таблицы (или) столбцов с информацией (sys.tables / INFORMATION_SCHEMA.COLUMNS ..etc) из источника (если источниксистема разрешает) и напишите сценарий для map the data types
в hive appropriate types
, затем сохраните их в своем desired format
в Hive.
РЕДАКТИРОВАТЬ:
Чтобы запустить команду grep
для содержимого потокового файла, нам нужно использовать ExecuteStreamCommand Процессор
ESC Configs:
Затем передать отношение output stream
в процессор ExtractText
ET Конфиги:
Добавить новое свойство как
содержимое
(?s)(.*)
Затем content attribute
добавляется в файл потока. Вы можете использовать этот атрибут и подготовить операторы создания таблицы.