Я хочу написать паркет в HDFS
Лично я бы не использовал Spark для этого.
Скорее я бы использовал HDFS Kafka Connector . Вот файл конфигурации, который поможет вам начать.
name=hdfs-sink
# List of topics to read
topics=test_hdfs
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
# increase to be the sum of the partitions for all connected topics
tasks.max=1
# the folder where core-site.xml and hdfs-site.xml exist
hadoop.conf.dir=/etc/hadoop
# the namenode url, defined as fs.defaultFS in the core-site.xml
hdfs.url=hdfs://hdfs-namenode.example.com:9000
# number of messages per file
flush.size=10
# The format to write the message values
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
# Setup Avro parser
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry.example.com:8081
value.converter.schemas.enable=true
schema.compatibility=BACKWARD
Если вы хотите, чтобы разделы HDFS основывались на поле, а не на буквальном номере «раздела Kafka», то обратитесь к документации по конфигурации на FieldPartitioner
. Если вы хотите автоматическую интеграцию Hive, см. Также документы по этому вопросу.
Допустим, вы действительно хотели использовать Spark, хотя вы можете попробовать AbsaOSS / ABRiS для чтения в Avro DataFrame, тогда вы сможете сделать что-то вроде df.write.format("parquet").path("/some/path")
(не точный код, потому что я не пробовал)