Я хочу переименовать файлы HDFS параллельно, используя spark.Но я получаю исключение сериализации, я упомянул исключение после моего кода.Я получаю эту проблему при использовании spark.sparkContext.parallelize.Также я могу переименовать все файлы, когда делаю это в цикле.
def renameHdfsToS3(spark : SparkSession, hdfsFolder :String, outputFileName:String,
renameFunction: (String,String) => String, bktOutput:String, folderOutput:String, kmsKey:String): Boolean = {
try {
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val path = new Path(hdfsFolder)
val files = fs.listStatus(path)
.filter(fs => fs.isFile)
val parallelRename=spark.sparkContext.parallelize(files).map(
f=>{
parallelRenameHdfs(fs,outputFileName,renamePartFileWithTS,f)
}
)
val hdfsTopLevelPath=fs.getWorkingDirectory()+"/"+hdfsFolder
return true
} catch {
case NonFatal(e) => {
e.printStackTrace()
return false
}
}
}
Ниже приведено исключение, которое я получаю
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
Caused by: java.io.NotSerializableException: org.apache.hadoop.fs.LocalFileSystem
Serialization stack:
- object not serializable (class: org.apache.hadoop.fs.LocalFileSystem, value: org.apache.hadoop.fs.LocalFileSystem@1d96d872)
- field (class: at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)