Конвертировать Avro в Паркет в NiFi - PullRequest
0 голосов
/ 19 февраля 2019

Я хотел бы конвертировать файлы Avro в паркет в NiFi.Я знаю, что можно конвертировать в ORC с помощью процессора ConvertAvroToORC , но я не нашел решения для конвертации в Parquet.

Я конвертирую JSON в Avro через Процессор ConvertRecord (JsonTreeReader и AvroRecordSetWriter).После этого я хотел бы преобразовать полезную нагрузку Avro в Parquet, прежде чем поместить ее в корзину S3.Я не хочу хранить его в HDFS, поэтому процессор PutParquet кажется неприменимым.

Мне понадобится процессор, такой как: ConvertAvroToParquet

Ответы [ 2 ]

0 голосов
/ 20 февраля 2019

@ Мартин, вы можете использовать очень удобный процессор ConvertAvroToParquet, который я недавно добавил в Nifi.Он должен быть доступен в последней версии.

Назначение этого процессора в точности соответствует тому, что вы ищете.Для получения более подробной информации об этом процессоре и почему он был создан: Nifi-5706

Код Ссылка .

0 голосов
/ 19 февраля 2019

На самом деле можно использовать процессор PutParquet.

Следующее описание относится к рабочему потоку в nifi-1.8.

Поместите следующие библиотеки в папку, например home/nifi/s3libs/:

  • aws-java-sdk-1.11.455.jar (+ сторонние библиотеки)
  • hadoop-aws-3.0.0.jar

Создайте XML-файл, например /home/nifi/s3conf/core-site.xml.Может потребоваться дополнительная настройка, используйте правильную конечную точку для вашей зоны.

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>s3a://BUCKET_NAME</value>
    </property>
    <property>
        <name>fs.s3a.access.key</name>
        <value>ACCESS-KEY</value>
    </property>
    <property>
        <name>fs.s3a.secret.key</name>
        <value>SECRET-KEY</value>
    </property>
    <property>
        <name>fs.AbstractFileSystem.s3a.imp</name>
        <value>org.apache.hadoop.fs.s3a.S3A</value>
    </property>
    <property>
        <name>fs.s3a.multipart.size</name>
        <value>104857600</value>
        <description>Parser could not handle 100M. replacing with bytes. Maybe not needed after testing</description>
    </property>
    <property>
        <name>fs.s3a.endpoint</name>
        <value>s3.eu-central-1.amazonaws.com</value> 
        <description>Frankfurt</description>
    </property>
    <property>
        <name>fs.s3a.fast.upload.active.blocks</name>
        <value>4</value>
        <description>
    Maximum Number of blocks a single output stream can have
    active (uploading, or queued to the central FileSystem
    instance's pool of queued operations.

    This stops a single stream overloading the shared thread pool.
        </description>
    </property>
    <property>
        <name>fs.s3a.threads.max</name>
        <value>10</value>
        <description>The total number of threads available in the filesystem for data
    uploads *or any other queued filesystem operation*.</description>
    </property>

    <property>
        <name>fs.s3a.max.total.tasks</name>
        <value>5</value>
        <description>The number of operations which can be queued for execution</description>
    </property>

    <property>
        <name>fs.s3a.threads.keepalivetime</name>
        <value>60</value>
        <description>Number of seconds a thread can be idle before being terminated.</description>
    </property>
    <property>
        <name>fs.s3a.connection.maximum</name>
        <value>15</value>
    </property>
</configuration>

Использование

Создание процессора PutParquet.В разделе Свойства установите

  • Ресурсы конфигурации Hadoop: /home/nifi/s3conf/core-site.xml,
  • Дополнительные ресурсы пути к классам: /home/nifi/s3libs,
  • Каталог: s3a://BUCKET_NAME/folder/ (EL доступен)
  • Тип сжатия: протестировано с NONE, SNAPPY
  • Удалить CRC: true

Файл потока должен содержать атрибут filename -Никаких причудливых символов или косых черт.

...