Извлечение многострочного содержимого из содержимого потокового файла - PullRequest
0 голосов
/ 23 сентября 2018

Я импортирую данные из таблицы MySQL (только для выбранных столбцов) и помещаю их в HDFS.Как только это будет сделано, я хочу создать таблицу в Hive.

Для этого у меня есть файл schema.sql, который содержит инструкцию CREATE TABLE для всей таблицы, и я хочу сгенерировать только новую инструкцию CREATE TABLE.для столбцов, которые я импортировал.

Что-то похожее на то, что я делаю с grep в следующем примере.

enter image description here

Я использовал FetchFile вместе сExtractText но не смог заставить его работать.Как я могу добиться этого, используя процессоры NiFi или даже Expression Language, если я получу общую схему в атрибут?

Или есть лучший способ создать таблицу на основе импортируемых данных?

1 Ответ

0 голосов
/ 23 сентября 2018

NiFi может генерировать операторы создания таблиц на основе содержимого потокового файла.

1.Создание таблиц ORC с помощью процессора ConvertAvroToORC:

  • если вы конвертируете данные avro в формат ORC, а затем сохраняете в HDFS, то процессор ConvertAvroToORC добавляет атрибут hive.ddl в файл потока.

  • Процессор PutHDFS добавляет absolute.hdfs.path атрибут для файла потока.

  • Мы можем использовать эти атрибуты hive.ddl , absolute.hdfs.path и создать таблицу орков вначало каталога HDFS динамически.

Поток:

 Pull data from source(ExecuteSQL...etc)
  -> ConvertAvroToORC //add Hive DbName,TableName in HiveTableName property value--> 
  -> PutHDFS //store the orc file into HDFS location --> 
  -> ReplaceText //Replace the flowfile content with ${hive.ddl} Location '${absolute.hdfs.path}'--> 
  -> PutHiveQL //execute the create table statement

См. эту ссылку для получения более подробной информации.приведенный выше поток.

2. Создание таблиц Avro с использованием процессора ExtractAvroMetaData:

  • В NiFi, как только мы извлекаем данныес помощью QueryDatabaseTable, процессоров ExecuteSQL формат данных в AVRO .

  • Мы можем создать таблицы Avro на основе схемы avro (файл .avsc) и с помощью процессора ExtractAvroMetaData мы можем извлечь схему и сохранить ее в качестве атрибута flowfileзатем, используя эту схему, мы можем динамически создавать таблицы AvroTable.

Flow:

ExecuteSQL (success)|-> PutHDFS //store data into HDFS
           (success)|-> ExtractAvroMetadata //configure Metadata Keys as avro.schema 
                     -> ReplaceText //replace flowfile content with avro.schema
                     -> PutHDFS //store the avsc file into schema directory
                     -> ReplaceText //create avro table on top of schema directory
                     -> PutHiveQL //execute the hive.ddl

Пример AVRO Создайте оператор таблицы:

CREATE TABLE as_avro
  ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
  STORED as INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
  OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
  TBLPROPERTIES (
    'avro.schema.url'='/path/to/the/schema/test_serializer.avsc');

Мы собираемся изменить путь к URL схемы с помощью процессора ReplaceText в указанном выше потоке.

Другой способс помощью процессора ExecuteSQL получить все инструкции для создания таблицы (или) столбцов с информацией (sys.tables / INFORMATION_SCHEMA.COLUMNS ..etc) из источника (если источниксистема разрешает) и напишите сценарий для map the data types в hive appropriate types, затем сохраните их в своем desired format в Hive.

РЕДАКТИРОВАТЬ:

Чтобы запустить команду grep для содержимого потокового файла, нам нужно использовать ExecuteStreamCommand Процессор

ESC Configs:

enter image description here

Затем передать отношение output stream в процессор ExtractText

ET Конфиги:

Добавить новое свойство как

содержимое

(?s)(.*)

enter image description here

Затем content attribute добавляется в файл потока. Вы можете использовать этот атрибут и подготовить операторы создания таблицы.

...