Каков наилучший способ вызова операций файловой системы Hadoop из объекта Serializable Scala? - PullRequest
0 голосов
/ 28 октября 2019

/ Что я пытаюсь /

Я хочу выполнить преобразование Spark UDF для нескольких блоков HDFS, содержащих файлы BZ2. Я определил объект MyMain Scala, который extends Serializable, поскольку он включает вызов преобразования UDF для каждого из этих сегментов HDFS.

Однако, прежде чем выполнять преобразования UDF, мне нужно отфильтровать сегменты HDFS, которые на самом деле содержат некоторые файлы BZ2. Для этого нужны операции Hadoop FileSystem, которые я сохранил в методе MyMain.main, чтобы ограничить эти вычисления в памяти драйвера и не распространять их на рабочие узлы, поскольку, как я понимаю, FileSystem не сериализуема.

Однако дажепосле того как я создал отдельный сериализуемый класс HadoopUtils и создал одноэлементный объект-компаньон и вызвал все операции FileSystem в MyMain.main, я все еще получаю исключение «Задача не сериализуема» (ниже)

/ Вопрос (с)) /

Как вызывать несериализуемые операции FileSystem из объекта Serializable, такого как MyMain? Кроме того, class HadoopUtils extends Serializable не выглядит сериализуемым, хотя определено как таковое?

/ Мой код /

val prependtoList = (x1: String, x2: List[String]) => x2.map(x1+_)

class HadoopUtils extends Serializable {

  def existsDir(fs: FileSystem, path: String) : Boolean = {
    val p = new Path(path)
    fs.exists(p) && fs.getFileStatus(p).isDirectory
  }
  def ifBZFileExists(fs: FileSystem, bucketBZDir: String) : Boolean = {
    val path = new Path(bucketBZDir)
    val fileStatus = fs.listStatus(path).filter(
      p => { p.isFile && p.getPath.getName.endsWith(".bz2")}
    )
    !fileStatus.isEmpty
  }

  def getBZ2Buckets(fs: FileSystem, lookupPath: String) : List[String] = {
    //Filter the list of buckets having at least one BZ2 file in it
    val range = (1 to 16).toList.map(x => x.toString)
    val buckets = prependtoList("Bucket",range)
    val allBuckets = prependtoList(lookupPath + "/", buckets)
    //From Bucket1 to Bucket16, filter the buckets that are existing e.g. Bucket5 may not exist
    val existingBuckets = allBuckets.filter(p => { existsDir(fs,p) })
    val BZ2BucketPaths = existingBuckets.filter(path => { ifBZFileExists(fs,path) }).map(
        path => { path + "/*.bz2" })
    BZ2BucketPaths
  }
}

object HadoopUtils {
  val getHadoopUtils = new HadoopUtils
}

object MyMain extends Serializable {
  val clusterNameNodeURL = "hdfs://mycluster.domain.com:8020"
  val basePath = "/path/to/buckets"
  def main(args: Array[String]): Unit = {
    //NOTE: spark, hadoopfs defined in main so as to be processed in Driver
    val spark = SparkSession
      .builder()
      .appName("My_App")
      .enableHiveSupport()
      .getOrCreate()

    val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)

    val BZ2Buckets = 
      HadoopUtils.getHadoopUtils.getBZ2BucketPaths(hadoopfs,clusterNameNodeURL + basePath)

    BZ2Buckets.foreach(path => {
      //Doing Spark UDF transformations on each bucket, which needs to be serialized
    })


  }
}

/ Stack Trace of Exception /

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:747)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:724)
  at MyMain$.main(<pastie>:197)
  ... 51 elided
Caused by: java.io.NotSerializableException: HadoopUtils$
Serialization stack:
    - object not serializable (class: HadoopUtils$, value: HadoopUtils$@7f5bab61)
    - field (class: $iw, name: HadoopUtils$module, type: class HadoopUtils$)
    - object (class $iw, $iw@3f4a0d43)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@74d06d1e)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@f9764ea)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@6821099e)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@4f509444)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@11462802)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@11d2d501)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@284fd700)
    - field (class: $line14.$read, name: $iw, type: class $iw)
    - object (class $line14.$read, $line14.$read@46b4206a)
    - field (class: $iw, name: $line14$read, type: class $line14.$read)
    - object (class $iw, $iw@33486894)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@25980fc9)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@1fb0d28d)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@42ea11d5)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@42d28cc1)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@22131a73)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@631878e1)
    - field (class: $line18.$read, name: $iw, type: class $iw)
    - object (class $line18.$read, $line18.$read@561c52c0)
    - field (class: $iw, name: $line18$read, type: class $line18.$read)
    - object (class $iw, $iw@1d5b8be2)
    - field (class: $iw, name: $outer, type: class $iw)
    - object (class $iw, $iw@4de4c672)
    - field (class: $anonfun$1, name: $outer, type: class $iw)
    - object (class $anonfun$1, <function2>)
    - element of array (index: 9)
    - array (class [Ljava.lang.Object;, size 15)
    - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, <function2>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
  ... 85 more

1 Ответ

0 голосов
/ 15 ноября 2019

Кажется, проблема Task not serializable не связана ни с классом HadoopUtils, ни с объектом. Учитывая, что в программе драйвера доступ к экземпляру класса HadoopUtils осуществляется через singleton HadoopUtils object, то есть HadoopUtils.getHadoopUtil, класс HadoopUtils необходимо сериализовать вместе с MyMain object.

Решениена проблему можно сослаться здесь

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...