Я перепробовал все подходы. Вот упрощенный код:
Подход 1
Подход 1 использует команды Had oop из сценария bash и др. c.
Это работает, но требует двойной записи HDFS и очистки. Также он не очень хорошо вписывается в Scala проект Spark.
(echo -e "version=2\ndate=2020-01-31\n\nid,name,age" | gzip -vc ; hadoop fs -cat "$INPUT_DIR/*" ) | hadoop fs -put - "$OUTPUT_PATH"
Здесь происходит то, что он
- выводит многострочный заголовок в стандартный вывод
- передать это в gzip и на стандартный вывод
- передать другое HDSF-каталог на стандартный вывод
- передать в
hadoop fs -put
, который объединит все
Подход 2
Код немного сложнее, в заголовке есть неплохие символы кавычек, но заголовки иногда идут после части csv.
import org.apache.hadoop.io.compress.GzipCodec
val heading = """version=2
date=2020-01-31
id,name,age""".split("\n", -1).toSeq
val headingRdd: RDD[String] = sc.parallelize(heading)
val mediamathRdd: RDD[String] = df.rdd.map(row => row.mkString(","))
val combinedResult: RDD[String] = (headingRdd union mediamathRdd)
combinedResult.repartition(1).saveAsTextFile(path, classOf[GzipCodec])
Подход 3
Самый простой подход, но выход немного отключен
df.repartition(1)
.withColumnRenamed("id", "version=2\ndate=2020-01-31\n\nid")
.option("header", true)
.option("delimiter", ",")
.option("quoteMode", "NONE")
.option("quote", " ")
.option("codec", "gzip")
.csv(path)
Результат будет выглядеть так, что может или может быть неприемлемым
version=2
date=2020-01-31
id ,name,age
1,Alice,21
2,Bob,23
Я также пытался с:
.option("quote", "\u0000")
Это фактически печатает нулевой устав ascii, и хотя это не было обнаружено в моем HDFS viewer это не было частью spe c.
Наилучший подход
Ни один из них не идеально подходит для того, что кажется очень простой задачей. Возможно, есть небольшое исправление, чтобы подход 2 работал идеально.