Сохранение RDD в виде текстового файла дает исключение FileAlreadyExists. Как создавать новый файл каждый раз, когда программа загружается, и удалять старый, используя FileUtils - PullRequest
0 голосов
/ 28 августа 2018

Код:

val badData:RDD[ListBuffer[String]] = rdd.filter(line => line(1).equals("XX") || line(5).equals("XX"))
badData.coalesce(1).saveAsTextFile(propForFile.getString("badDataFilePath"))

В первый раз программа работает нормально. При повторном запуске выдает исключение для файла AlreadyExists. Я хочу решить эту проблему, используя FileUtils функциональные возможности Java и сохранить rdd в виде текстового файла.

Ответы [ 3 ]

0 голосов
/ 28 августа 2018

Перед записью файла по указанному пути удалите уже существующий путь.

val fs = FileSystem.get(sc.hadoopConfiguration)
fs.delete(new Path(bad/data/file/path), true)

Затем выполните ваш обычный процесс записи. Надеюсь, что это решит проблему.

0 голосов
/ 04 сентября 2018
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

val fs = spark.SparkContext.hadoopCofigurations
if (fs.exists(new Path(path/to/the/files)))
    fs.delete(new Path(path/to/the/files), true)

Передать имя файла как String методу, если каталог или файлы будут удалены. Используйте этот фрагмент кода перед записью его в выходной путь.

0 голосов
/ 28 августа 2018

Почему бы не использовать DataFrames? Получите RDD[ListBuffer[String] в RDD[Row] - что-то вроде -

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
val badData:RDD[ListBuffer[String]] = rdd.map(line => 
  Row(line(0), line(1)... line(n))
 .filter(row => filter stuff)
badData.toDF().write.mode(SaveMode.Overwrite)
...