Проблема с сериализацией при переименовании файла HDFS с использованием Scala Spark параллельно - PullRequest
0 голосов
/ 05 октября 2018

Я хочу переименовать файлы HDFS параллельно, используя spark.Но я получаю исключение сериализации, я упомянул исключение после моего кода.Я получаю эту проблему при использовании spark.sparkContext.parallelize.Также я могу переименовать все файлы, когда делаю это в цикле.

  def renameHdfsToS3(spark : SparkSession, hdfsFolder :String, outputFileName:String,
                     renameFunction: (String,String) => String, bktOutput:String, folderOutput:String, kmsKey:String): Boolean = {
    try {
      val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
      val path = new Path(hdfsFolder)
      val files = fs.listStatus(path)
        .filter(fs => fs.isFile)

      val parallelRename=spark.sparkContext.parallelize(files).map(
        f=>{
          parallelRenameHdfs(fs,outputFileName,renamePartFileWithTS,f)
        }
      )
      val hdfsTopLevelPath=fs.getWorkingDirectory()+"/"+hdfsFolder
      return true
    } catch {
      case NonFatal(e) => {
        e.printStackTrace()
        return false
      }
    }
  }

Ниже приведено исключение, которое я получаю

org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)

Caused by: java.io.NotSerializableException: org.apache.hadoop.fs.LocalFileSystem
Serialization stack:
    - object not serializable (class: org.apache.hadoop.fs.LocalFileSystem, value: org.apache.hadoop.fs.LocalFileSystem@1d96d872)
    - field (class:         at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)

1 Ответ

0 голосов
/ 06 октября 2018

Подход неверен, так как sc.parallelize предназначен для потребления данных через RDD.Вы должны работать на уровне операционной системы.Существует много таких сообщений.

Что-то вроде этого должно быть достаточно, смешивая это с вашей собственной логикой, например, примечание, которое позволяет параллельную обработку, например:

originalpath.par.foreach( e => hdfs.rename(e,e.suffix("finish")))

Вам необходимо проверить, как определяется параллелизм.par.Смотрите здесь https://docs.scala -lang.org / обзор / параллельные коллекции / configuration.html

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...