По сути, вы пытаетесь создать CSV-файлы только с одним заголовком для всех из них.Одним простым решением будет использование coalesce(1)
и удаление введенного вами repartition(10)
.Проблема в том, что все данные отправляются в один раздел.Это может быть очень медленно или хуже, выбросить ошибку OOM.Тем не менее (если это работает) вы получаете один большой файл с одним заголовком.
Чтобы продолжать пользоваться преимуществами параллелизма spark a, вы можете написать заголовок отдельно, как это (при условии, что у нас есть фрейм данных df
)
val output = "hdfs:///...path.../output.csv"
val merged_output = "hdfs:///...path.../merged_output.csv"
import spark.implicits._
// Let's build the header
val header = responseWithSelectedColumns
.schema.fieldNames.reduceLeft(_+","+_)
// Let's write the data
responseWithSelectedColumns.write.csv(output)
// Let's write the header without spark
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
val f = hdfs.create(new Path(output + "/header"))
f.write(header.getBytes)
f.close()
// Let's merge everything into one file
FileUtil.copyMerge(hdfs, new Path(output), hdfs, new Path(merged_output),
false,hadoopConfig, null)
Также обратите внимание, что spark 2.x поддерживает запись csv из коробки.Это то, что я использовал вместо библиотеки блоков данных, что делает вещи немного более многословными.