Создать DF после регистрации предыдущего DF в Spark Scala - PullRequest
0 голосов
/ 16 февраля 2019

Я новый разработчик в Spark Scala, и я хочу спросить вас о моей проблеме.

У меня есть два огромных кадра данных, мой второй кадр данных вычисляется из первого кадра данных (он содержит отдельный столбец изпервый).

Чтобы оптимизировать мой код, я подумал о таком подходе:

  • Зарегистрировать мой первый кадр данных в виде файла .csv в HDFS
  • А потом простопрочитайте этот файл .csv для вычисления второго фрейма данных.

Итак, он написал следующее:

//val temp1 is my first DF
writeAsTextFileAndMerge("result1.csv", "/user/result", temp1, spark.sparkContext.hadoopConfiguration)

val temp2 = spark.read.options(Map("header" -> "true", "delimiter" -> ";"))
      .csv("/user/result/result1.csv").select("ID").distinct

    writeAsTextFileAndMerge("result2.csv", "/user/result",
      temp2, spark.sparkContext.hadoopConfiguration)

И это моя функция сохранения:

def writeAsTextFileAndMerge(fileName: String, outputPath: String, df: DataFrame, conf: Configuration) {
    val sourceFile = WorkingDirectory
    df.write.options(Map("header" -> "true", "delimiter" -> ";")).mode("overwrite").csv(sourceFile)
    merge(fileName, sourceFile, outputPath, conf)
  }

  def merge(fileName: String, srcPath: String, dstPath: String, conf: Configuration) {
    val hdfs = FileSystem.get(conf)
    val destinationPath = new Path(dstPath)
    if (!hdfs.exists(destinationPath))
      hdfs.mkdirs(destinationPath)
    FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName),
      true, conf, null)
  }

Это кажется "логичным" для меня, но я получил ошибки при этом.Я полагаю, что Spark не может «подождать», пока не зарегистрирует мой первый DF в HDFS, и ПОСЛЕ того, чтобы прочитать этот новый файл (или, может быть, у меня возникли некоторые ошибки в функции сохранения?).

Вот исключение, которое яполучил:

19/02/16 17:27:56 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.ArrayIndexOutOfBoundsException: 1
java.lang.ArrayIndexOutOfBoundsException: 1

Можете ли вы помочь мне исправить это, пожалуйста?

1 Ответ

0 голосов
/ 16 февраля 2019

Проблема в слиянии - Spark не знает и, следовательно, не синхронизируется со всеми операциями HDFS, которые вы делаете.

Хорошая новость заключается в том, что вам не нужно этого делать.просто сделайте df.write, а затем создайте новый фрейм данных со значением read (spark будет считывать все части в один df)

, то есть следующее будет работать нормально

temp1.write.options(Map("header" -> "true", "delimiter" -> ";")).mode("overwrite").csv("/user/result/result1.csv")
val temp2 = spark.read.options(Map("header" -> "true", "delimiter" -> ";"))
      .csv("/user/result/result1.csv").select("ID").distinct
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...