Я новый разработчик в Spark Scala, и я хочу спросить вас о моей проблеме.
У меня есть два огромных кадра данных, мой второй кадр данных вычисляется из первого кадра данных (он содержит отдельный столбец изпервый).
Чтобы оптимизировать мой код, я подумал о таком подходе:
- Зарегистрировать мой первый кадр данных в виде файла .csv в HDFS
- А потом простопрочитайте этот файл .csv для вычисления второго фрейма данных.
Итак, он написал следующее:
//val temp1 is my first DF
writeAsTextFileAndMerge("result1.csv", "/user/result", temp1, spark.sparkContext.hadoopConfiguration)
val temp2 = spark.read.options(Map("header" -> "true", "delimiter" -> ";"))
.csv("/user/result/result1.csv").select("ID").distinct
writeAsTextFileAndMerge("result2.csv", "/user/result",
temp2, spark.sparkContext.hadoopConfiguration)
И это моя функция сохранения:
def writeAsTextFileAndMerge(fileName: String, outputPath: String, df: DataFrame, conf: Configuration) {
val sourceFile = WorkingDirectory
df.write.options(Map("header" -> "true", "delimiter" -> ";")).mode("overwrite").csv(sourceFile)
merge(fileName, sourceFile, outputPath, conf)
}
def merge(fileName: String, srcPath: String, dstPath: String, conf: Configuration) {
val hdfs = FileSystem.get(conf)
val destinationPath = new Path(dstPath)
if (!hdfs.exists(destinationPath))
hdfs.mkdirs(destinationPath)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName),
true, conf, null)
}
Это кажется "логичным" для меня, но я получил ошибки при этом.Я полагаю, что Spark не может «подождать», пока не зарегистрирует мой первый DF в HDFS, и ПОСЛЕ того, чтобы прочитать этот новый файл (или, может быть, у меня возникли некоторые ошибки в функции сохранения?).
Вот исключение, которое яполучил:
19/02/16 17:27:56 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.ArrayIndexOutOfBoundsException: 1
java.lang.ArrayIndexOutOfBoundsException: 1
Можете ли вы помочь мне исправить это, пожалуйста?