Я читаю текстовый файл, и это файл фиксированной ширины, который мне нужно преобразовать в CSV.Моя программа отлично работает на локальном компьютере, но когда я запускаю ее на кластере, она выдает исключение «Задача не сериализуема».
Я пытался решить ту же проблему с map и mapPartition.
Работает нормальнос помощью toLocalIterator на RDD.Но он не работает с большими файлами (у меня есть файлы размером 8 ГБ)
Ниже приведен код с использованием mapPartition, который я недавно пытался
// прочитать исходный файл и создать RDD
def main(){
var inpData = sc.textFile(s3File)
LOG.info(s"\n inpData >>>>>>>>>>>>>>> [${inpData.count()}]")
val rowRDD = inpData.mapPartitions(iter=>{
var listOfRow = new ListBuffer[Row]
while(iter.hasNext){
var line = iter.next()
if(line.length() >= maxIndex){
listOfRow += getRow(line,indexList)
}else{
counter+=1
}
}
listOfRow.toIterator
})
rowRDD .foreach(println)
}
case class StartEnd(startingPosition: Int, endingPosition: Int) extends Serializable
def getRow(x: String, inst: List[StartEnd]): Row = {
val columnArray = new Array[String](inst.size)
for (f <- 0 to inst.size - 1) {
columnArray(f) = x.substring(inst(f).startingPosition, inst(f).endingPosition)
}
Row.fromSeq(columnArray)
}
// Примечание: для вашей ссылки, indexList я создал с помощью класса case StartEnd, который выглядит как показано ниже после создания
[List(StartEnd(0,4), StartEnd(4,10), StartEnd(7,12), StartEnd(10,14))]
Эта программа отлично работает на моем локальном компьютере.Но когда я включаю кластер (AWS), он выдает исключение, как показано ниже.
17:24:10.947 [Driver] ERROR bms.edl.dt.transform.FileConversion.convertFixedWidthToCsv - Exception [Task not serializable]
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340) ~[glue-assembly.jar:?]
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330) ~[glue-assembly.jar:?]
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156) ~[glue-assembly.jar:?]
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294) ~[glue-assembly.jar:?]
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:794) ~[glue-assembly.jar:?]
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793) ~[glue-assembly.jar:?]
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) ~[glue-assembly.jar:?]
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) ~[glue-assembly.jar:?]
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) ~[glue-assembly.jar:?]
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:793) ~[glue-assembly.jar:?]
Caused by: java.io.NotSerializableException: sun.nio.cs.UTF_8
Serialization stack:
- object not serializable (class: sun.nio.cs.UTF_8, value: UTF-8)
- field (class: org.apache.logging.log4j.core.layout.AbstractStringLayout, name: charset, type: class java.nio.charset.Charset)
- object (class org.apache.logging.log4j.core.layout.PatternLayout, %d{HH:mm:ss.SSS} [%t] %-5level %logger{5}.%M - %msg%n)
- field (class: org.apache.logging.log4j.core.appender.AbstractAppender, name: layout, type: interface org.apache.logging.log4j.core.Layout)
- object (class org.apache.logging.log4j.core.appender.ConsoleAppender, STDOUT)
- writeObject data (class: java.util.concurrent.ConcurrentHashMap)
- object (class java.util.concurrent.ConcurrentHashMap, {STDOUT=STDOUT})
- field (class: org.apache.logging.log4j.core.config.AbstractConfiguration, name: appenders, type: interface java.util.concurrent.ConcurrentMap)
- object (class org.apache.logging.log4j.core.config.xml.XmlConfiguration, XmlConfiguration[location=jar:file:/mnt/yarn/usercache/root/filecache/163/edl-dt-1.9-SNAPSHOT.jar!/log4j2.xml])
- field (class: org.apache.logging.log4j.core.LoggerContext, name: configuration, type: interface org.apache.logging.log4j.core.config.Configuration)
- object (class org.apache.logging.log4j.core.LoggerContext, org.apache.logging.log4j.core.LoggerContext@418bb61f)
- field (class: org.apache.logging.log4j.core.Logger, name: context, type: class org.apache.logging.log4j.core.LoggerContext)
- object (class org.apache.logging.log4j.core.Logger, com.bms.edl.dt.transform.FileConversion:TRACE in 681842940)
- field (class: com.bms.edl.dt.transform.FileConversion, name: LOG, type: interface org.apache.logging.log4j.Logger)
- object (class com.bms.edl.dt.transform.FileConversion, com.bms.edl.dt.transform.FileConversion@984ddbb)
- field (class: com.bms.edl.dt.transform.FileConversion$$anonfun$7, name: $outer, type: class com.bms.edl.dt.transform.FileConversion)
- object (class com.bms.edl.dt.transform.FileConversion$$anonfun$7, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) ~[glue-assembly.jar:?]
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) ~[glue-assembly.jar:?]
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) ~[glue-assembly.jar:?]
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:337) ~[glue-assembly.jar:?]
... 71 more
17:24:10.954 [Driver] TRACE bms.edl.dt.transform.FileConversion.convertFixedWidthToCsv - Exit
17:24:10.954 [Driver] INFO bms.edl.dt.transform.FileConversion.apply - counterMap>>>>>>>>>Map(ResultantDF -> [], ExceptionString ->
Exception occurred while applying the FileConversion transformation and the exception Message is :Task not serializable)
17:24:11.692 [Driver] INFO bms.edl.dt.transform.FileConversion.apply - df count >>>>>>>>>0
17:24:11.692 [Driver] INFO bms.edl.dt.transform.FileConversion.apply - THERE WAS AN EXCEPTION FIX WIDTHING
17:24:11.692 [Driver] INFO bms.edl.dt.transform.FileConversion.dataTransform - THERE WAS AN EXCEPTION -- sb is not empty
17:24:11.693 [Driver] TRACE bms.edl.dt.transform.FileConversion.dataTransform - Exit
17:24:11.693 [Driver] INFO bms.edl.dt.transform.FileConversion.dataTransform - result>>>>>>>>Map(ResultantDF -> [], ExceptionString ->
Exception occurred while applying the FileConversion transformation and the exception Message is :Task not serializable
Exception occurred while applying the FileConversion transformation and the exception Message is :Task not serializable)
17:24:11.693 [Driver] TRACE edl.core.services.reflection.ReflectionInvoker$.invokeDTMethod - Exit
Я не могу понять, что здесь не так, а что не сериализуемо, почему оно вызывает исключение.
Любая помощь приветствуется.Заранее спасибо!