/ Что я пытаюсь /
Я хочу выполнить преобразование Spark UDF для нескольких блоков HDFS, содержащих файлы BZ2. Я определил объект MyMain Scala, который extends Serializable
, поскольку он включает вызов преобразования UDF для каждого из этих сегментов HDFS.
Однако, прежде чем выполнять преобразования UDF, мне нужно отфильтровать сегменты HDFS, которые на самом деле содержат некоторые файлы BZ2. Для этого нужны операции Hadoop FileSystem, которые я сохранил в методе MyMain.main, чтобы ограничить эти вычисления в памяти драйвера и не распространять их на рабочие узлы, поскольку, как я понимаю, FileSystem не сериализуема.
Однако дажепосле того как я создал отдельный сериализуемый класс HadoopUtils и создал одноэлементный объект-компаньон и вызвал все операции FileSystem в MyMain.main, я все еще получаю исключение «Задача не сериализуема» (ниже)
/ Вопрос (с)) /
Как вызывать несериализуемые операции FileSystem из объекта Serializable, такого как MyMain? Кроме того, class HadoopUtils extends Serializable
не выглядит сериализуемым, хотя определено как таковое?
/ Мой код /
val prependtoList = (x1: String, x2: List[String]) => x2.map(x1+_)
class HadoopUtils extends Serializable {
def existsDir(fs: FileSystem, path: String) : Boolean = {
val p = new Path(path)
fs.exists(p) && fs.getFileStatus(p).isDirectory
}
def ifBZFileExists(fs: FileSystem, bucketBZDir: String) : Boolean = {
val path = new Path(bucketBZDir)
val fileStatus = fs.listStatus(path).filter(
p => { p.isFile && p.getPath.getName.endsWith(".bz2")}
)
!fileStatus.isEmpty
}
def getBZ2Buckets(fs: FileSystem, lookupPath: String) : List[String] = {
//Filter the list of buckets having at least one BZ2 file in it
val range = (1 to 16).toList.map(x => x.toString)
val buckets = prependtoList("Bucket",range)
val allBuckets = prependtoList(lookupPath + "/", buckets)
//From Bucket1 to Bucket16, filter the buckets that are existing e.g. Bucket5 may not exist
val existingBuckets = allBuckets.filter(p => { existsDir(fs,p) })
val BZ2BucketPaths = existingBuckets.filter(path => { ifBZFileExists(fs,path) }).map(
path => { path + "/*.bz2" })
BZ2BucketPaths
}
}
object HadoopUtils {
val getHadoopUtils = new HadoopUtils
}
object MyMain extends Serializable {
val clusterNameNodeURL = "hdfs://mycluster.domain.com:8020"
val basePath = "/path/to/buckets"
def main(args: Array[String]): Unit = {
//NOTE: spark, hadoopfs defined in main so as to be processed in Driver
val spark = SparkSession
.builder()
.appName("My_App")
.enableHiveSupport()
.getOrCreate()
val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val BZ2Buckets =
HadoopUtils.getHadoopUtils.getBZ2BucketPaths(hadoopfs,clusterNameNodeURL + basePath)
BZ2Buckets.foreach(path => {
//Doing Spark UDF transformations on each bucket, which needs to be serialized
})
}
}
/ Stack Trace of Exception /
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
at org.apache.spark.sql.Dataset.show(Dataset.scala:747)
at org.apache.spark.sql.Dataset.show(Dataset.scala:724)
at MyMain$.main(<pastie>:197)
... 51 elided
Caused by: java.io.NotSerializableException: HadoopUtils$
Serialization stack:
- object not serializable (class: HadoopUtils$, value: HadoopUtils$@7f5bab61)
- field (class: $iw, name: HadoopUtils$module, type: class HadoopUtils$)
- object (class $iw, $iw@3f4a0d43)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@74d06d1e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@f9764ea)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6821099e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4f509444)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@11462802)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@11d2d501)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@284fd700)
- field (class: $line14.$read, name: $iw, type: class $iw)
- object (class $line14.$read, $line14.$read@46b4206a)
- field (class: $iw, name: $line14$read, type: class $line14.$read)
- object (class $iw, $iw@33486894)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@25980fc9)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1fb0d28d)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@42ea11d5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@42d28cc1)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@22131a73)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@631878e1)
- field (class: $line18.$read, name: $iw, type: class $iw)
- object (class $line18.$read, $line18.$read@561c52c0)
- field (class: $iw, name: $line18$read, type: class $line18.$read)
- object (class $iw, $iw@1d5b8be2)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@4de4c672)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function2>)
- element of array (index: 9)
- array (class [Ljava.lang.Object;, size 15)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 85 more