Используя Spark в GCP Dataproc, я успешно записываю весь RDD в GCS следующим образом:
rdd.saveAsTextFile(s"gs://$path")
Продукты - это файлы для каждого раздела по одному пути.
Как мнезаписывать файлы для каждого раздела (с уникальным путем, основанным на информации из раздела)
Ниже приведен придуманный нерабочий пример желаемого кода
rdd.mapPartitionsWithIndex(
(i, partition) =>{
partition.write(path = s"gs://partition_$i", data = partition_specific_data)
}
)
, когда я вызываю функцию ниже изнутрираздел на моем Mac записывает на локальный диск, на Dataproc я получаю сообщение об ошибке, не распознавая gs как допустимый путь.
def writeLocally(filePath: String, data: Array[Byte], errorMessage: String): Unit = {
println("Juicy Platform")
val path = new Path(filePath)
var ofos: Option[FSDataOutputStream] = null
try {
println(s"\nTrying to write to $filePath\n")
val conf = new Configuration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
// conf.addResource(new Path("/home/hadoop/conf/core-site.xml"))
println(conf.toString)
val fs = FileSystem.get(conf)
val fos = fs.create(path)
ofos = Option(fos)
fos.write(data)
println(s"\nWrote to $filePath\n")
}
catch {
case e: Exception =>
logError(errorMessage, s"Exception occurred writing to GCS:\n${ExceptionUtils.getStackTrace(e)}")
}
finally {
ofos match {
case Some(i) => i.close()
case _ =>
}
}
}
Это ошибка:
java.lang.IllegalArgumentException: Wrong FS: gs://path/myFile.json, expected: hdfs://cluster-95cf-m