Используя Spark на Dataproc, как писать в GCS отдельно от каждого раздела? - PullRequest
0 голосов
/ 27 сентября 2018

Используя Spark в GCP Dataproc, я успешно записываю весь RDD в GCS следующим образом:

rdd.saveAsTextFile(s"gs://$path")

Продукты - это файлы для каждого раздела по одному пути.

Как мнезаписывать файлы для каждого раздела (с уникальным путем, основанным на информации из раздела)

Ниже приведен придуманный нерабочий пример желаемого кода

    rdd.mapPartitionsWithIndex(
      (i, partition) =>{

        partition.write(path = s"gs://partition_$i", data = partition_specific_data)
      }
    )

, когда я вызываю функцию ниже изнутрираздел на моем Mac записывает на локальный диск, на Dataproc я получаю сообщение об ошибке, не распознавая gs как допустимый путь.

def writeLocally(filePath: String, data: Array[Byte], errorMessage: String): Unit = {

println("Juicy Platform")

val path = new Path(filePath)

var ofos: Option[FSDataOutputStream] = null

try {

  println(s"\nTrying to write to $filePath\n")

  val conf = new Configuration()

  conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
  conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

  //      conf.addResource(new Path("/home/hadoop/conf/core-site.xml"))


  println(conf.toString)

  val fs = FileSystem.get(conf)

  val fos = fs.create(path)
  ofos = Option(fos)

  fos.write(data)

  println(s"\nWrote to $filePath\n")
}
catch {
  case e: Exception =>

    logError(errorMessage, s"Exception occurred writing to GCS:\n${ExceptionUtils.getStackTrace(e)}")
}
finally {
  ofos match {
    case Some(i) => i.close()
    case _ =>
  }
}
  }

Это ошибка:

java.lang.IllegalArgumentException: Wrong FS: gs://path/myFile.json, expected: hdfs://cluster-95cf-m

1 Ответ

0 голосов
/ 09 октября 2018

Если вы работаете в кластере Dataproc, вам не нужно явно указывать "fs.gs.impl" в конфигурации;new Configuration() уже должен содержать необходимые отображения.

Основная проблема здесь в том, что val fs = FileSystem.get(conf) использует свойство fs.defaultFS для conf;у него нет возможности узнать, хотите ли вы получить экземпляр FileSystem, специфичный для HDFS или GCS.В целом, в Hadoop и Spark экземпляр FileSystem фундаментально привязан к одному URL scheme;вам нужно получить экземпляр схемы для каждой отдельной схемы, такой как hdfs:// или gs:// или s3://.

Самое простое решение вашей проблемы - всегда использовать Path.getFileSystem(Конфигурация) в отличие от FileSystem.get(Configuration).И убедитесь, что ваш path полностью соответствует схеме:

...
val path = "gs://bucket/foo/data"
val fs = path.getFileSystem(conf)

val fos = fs.create(path)
ofos = Option(fos)

fos.write(data)
...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...