На самом деле можно использовать процессор PutParquet.
Следующее описание относится к рабочему потоку в nifi-1.8.
Поместите следующие библиотеки в папку, например home/nifi/s3libs/
:
- aws-java-sdk-1.11.455.jar (+ сторонние библиотеки)
- hadoop-aws-3.0.0.jar
Создайте XML-файл, например /home/nifi/s3conf/core-site.xml
.Может потребоваться дополнительная настройка, используйте правильную конечную точку для вашей зоны.
<configuration>
<property>
<name>fs.defaultFS</name>
<value>s3a://BUCKET_NAME</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<value>ACCESS-KEY</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>SECRET-KEY</value>
</property>
<property>
<name>fs.AbstractFileSystem.s3a.imp</name>
<value>org.apache.hadoop.fs.s3a.S3A</value>
</property>
<property>
<name>fs.s3a.multipart.size</name>
<value>104857600</value>
<description>Parser could not handle 100M. replacing with bytes. Maybe not needed after testing</description>
</property>
<property>
<name>fs.s3a.endpoint</name>
<value>s3.eu-central-1.amazonaws.com</value>
<description>Frankfurt</description>
</property>
<property>
<name>fs.s3a.fast.upload.active.blocks</name>
<value>4</value>
<description>
Maximum Number of blocks a single output stream can have
active (uploading, or queued to the central FileSystem
instance's pool of queued operations.
This stops a single stream overloading the shared thread pool.
</description>
</property>
<property>
<name>fs.s3a.threads.max</name>
<value>10</value>
<description>The total number of threads available in the filesystem for data
uploads *or any other queued filesystem operation*.</description>
</property>
<property>
<name>fs.s3a.max.total.tasks</name>
<value>5</value>
<description>The number of operations which can be queued for execution</description>
</property>
<property>
<name>fs.s3a.threads.keepalivetime</name>
<value>60</value>
<description>Number of seconds a thread can be idle before being terminated.</description>
</property>
<property>
<name>fs.s3a.connection.maximum</name>
<value>15</value>
</property>
</configuration>
Использование
Создание процессора PutParquet
.В разделе Свойства установите
- Ресурсы конфигурации Hadoop:
/home/nifi/s3conf/core-site.xml
, - Дополнительные ресурсы пути к классам:
/home/nifi/s3libs
, - Каталог:
s3a://BUCKET_NAME/folder/
(EL доступен) - Тип сжатия: протестировано с NONE, SNAPPY
- Удалить CRC: true
Файл потока должен содержать атрибут filename
-Никаких причудливых символов или косых черт.